Skip to content

Commit

Permalink
chore: disable persistent journaling feature (dragonflydb#1549)
Browse files Browse the repository at this point in the history
1. We have not worked on it for many months
2. It's not on the short term roadmap
3. It complicates the code around the replication.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 17, 2023
1 parent 2c04311 commit 9448220
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 28 deletions.
10 changes: 7 additions & 3 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);

if (sub_cmd == "JOURNAL" && args.size() >= 2) {
/*if (sub_cmd == "JOURNAL" && args.size() >= 2) {
return Journal(args, cntx);
}
}*/

if (sub_cmd == "THREAD") {
return Thread(args, cntx);
Expand Down Expand Up @@ -139,6 +139,7 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
rb->SendError(kSyntaxErr);
}

#if 0
void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 2u);
ToUpper(&args[1]);
Expand Down Expand Up @@ -201,6 +202,8 @@ void DflyCmd::Journal(CmdArgList args, ConnectionContext* cntx) {
return rb->SendError(reply, kSyntaxErrType);
}

#endif

void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
util::ProactorPool* pool = shard_set->pool();
Expand Down Expand Up @@ -472,8 +475,9 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
flow->saver.reset();
};

sf_->journal()->StartInThread();

// Shard can be null for io thread.
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
if (shard != nullptr) {
flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class DflyCmd {
private:
// JOURNAL [START/STOP]
// Start or stop journaling.
void Journal(CmdArgList args, ConnectionContext* cntx);
// void Journal(CmdArgList args, ConnectionContext* cntx);

// THREAD [to_thread]
// Return connection thread index or migrate to another thread.
Expand Down
24 changes: 15 additions & 9 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,20 @@ thread_local JournalSlice journal_slice;
Journal::Journal() {
}

error_code Journal::OpenInThread(bool persistent, string_view dir) {
void Journal::StartInThread() {
journal_slice.Init(unsigned(ProactorBase::GetIndex()));

ServerState::tlocal()->set_journal(this);
EngineShard* shard = EngineShard::tlocal();
if (shard) {
shard->set_journal(this);
}
}

#if 0
error_code Journal::OpenInThread(bool persistent, string_view dir) {


error_code ec;

if (persistent) {
Expand All @@ -40,14 +51,9 @@ error_code Journal::OpenInThread(bool persistent, string_view dir) {
}
}

ServerState::tlocal()->set_journal(this);
EngineShard* shard = EngineShard::tlocal();
if (shard) {
shard->set_journal(this);
}

return ec;
}
#endif

error_code Journal::Close() {
CHECK(lameduck_.load(memory_order_relaxed));
Expand All @@ -65,12 +71,12 @@ error_code Journal::Close() {
shard->set_journal(nullptr);
}

auto ec = journal_slice.Close();
/*auto ec = journal_slice.Close();
if (ec) {
lock_guard lk2(ec_mu);
res = ec;
}
}*/
};

shard_set->pool()->AwaitFiberOnAll(close_cb);
Expand Down
4 changes: 4 additions & 0 deletions src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ class Journal {
// and false otherwise.
bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones.

void StartInThread();

// Requires: journal is in lameduck mode.
std::error_code Close();

#if 0
// Opens journal inside a Dragonfly thread. Must be called in each thread.
std::error_code OpenInThread(bool persistent, std::string_view dir);
#endif

//******* The following functions must be called in the context of the owning shard *********//

Expand Down
24 changes: 15 additions & 9 deletions src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ namespace fs = std::filesystem;

namespace {

/*
string ShardName(std::string_view base, unsigned index) {
return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log");
}
*/

} // namespace

Expand All @@ -42,17 +44,18 @@ JournalSlice::JournalSlice() {
}

JournalSlice::~JournalSlice() {
CHECK(!shard_file_);
// CHECK(!shard_file_);
}

void JournalSlice::Init(unsigned index) {
if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op.
return;
// if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op.
// return;

slice_index_ = index;
ring_buffer_.emplace(128); // TODO: to make it configurable
// ring_buffer_.emplace(128); // TODO: to make it configurable
}

#if 0
std::error_code JournalSlice::Open(std::string_view dir) {
CHECK(!shard_file_);
DCHECK_NE(slice_index_, UINT32_MAX);
Expand Down Expand Up @@ -111,15 +114,17 @@ error_code JournalSlice::Close() {

return ec;
}
#endif

void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
DCHECK(ring_buffer_);

// DCHECK(ring_buffer_);
if (entry.opcode != Op::NOOP) {
// TODO: This is preparation for AOC style journaling, currently unused.
RingItem item;
item.lsn = lsn_;
lsn_++;
// TODO: This is preparation for AOC style journaling, currently unused.
#if 0
RingItem item;
item.lsn = prev_lsn;

item.opcode = entry.opcode;
item.txid = entry.txid;
VLOG(1) << "Writing item [" << item.lsn << "]: " << entry.ToString();
Expand All @@ -131,6 +136,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
CHECK_EC(ec);
file_offset_ += line.size();
}
#endif
}

{
Expand Down
13 changes: 7 additions & 6 deletions src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <string_view>

#include "base/ring_buffer.h"
#include "core/uring.h"
#include "server/common.h"
#include "server/journal/types.h"

Expand All @@ -23,9 +22,11 @@ class JournalSlice {

void Init(unsigned index);

#if 0
std::error_code Open(std::string_view dir);

std::error_code Close();
#endif

// This is always the LSN of the *next* journal entry.
LSN cur_lsn() const {
Expand All @@ -36,9 +37,9 @@ class JournalSlice {
return status_ec_;
}

// Whether the file-based journaling is open.
// Whether the journaling is open.
bool IsOpen() const {
return bool(shard_file_);
return slice_index_ != UINT32_MAX;
}

void AddLogRecord(const Entry& entry, bool await);
Expand All @@ -53,9 +54,9 @@ class JournalSlice {
private:
struct RingItem;

std::string shard_path_;
std::unique_ptr<LinuxFile> shard_file_;
std::optional<base::RingBuffer<RingItem>> ring_buffer_;
// std::string shard_path_;
// std::unique_ptr<LinuxFile> shard_file_;
// std::optional<base::RingBuffer<RingItem>> ring_buffer_;

mutable util::SharedMutex cb_mu_;
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);
Expand Down

0 comments on commit 9448220

Please sign in to comment.