Skip to content

Commit

Permalink
Safe applying logs on rsm
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Mar 3, 2024
1 parent cd762ae commit 51e2683
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
14 changes: 8 additions & 6 deletions examples/kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ TMessageHolder<TMessage> TKv::Read(TMessageHolder<TCommandRequest> message, uint
}
}

void TKv::Write(TMessageHolder<TLogEntry> message) {
auto writeKv = message.Cast<TWriteKv>();
std::string_view k(writeKv->Data, writeKv->KeySize);
std::string_view v(writeKv->Data + writeKv->KeySize, writeKv->ValSize);
H[std::string(k)] = std::string(v);
return;
void TKv::Write(TMessageHolder<TLogEntry> message, uint64_t index) {
if (index < LastAppliedIndex) {
auto writeKv = message.Cast<TWriteKv>();
std::string_view k(writeKv->Data, writeKv->KeySize);
std::string_view v(writeKv->Data + writeKv->KeySize, writeKv->ValSize);
H[std::string(k)] = std::string(v);
LastAppliedIndex = index;
}
}

TMessageHolder<TLogEntry> TKv::Prepare(TMessageHolder<TCommandRequest> command, uint64_t term) {
Expand Down
3 changes: 2 additions & 1 deletion examples/kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
class TKv: public IRsm {
public:
TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message, uint64_t index) override;
void Write(TMessageHolder<TLogEntry> message) override;
void Write(TMessageHolder<TLogEntry> message, uint64_t index) override;
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) override;

private:
uint64_t LastAppliedIndex = 0;
std::unordered_map<std::string, std::string> H;
};
9 changes: 6 additions & 3 deletions src/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ TMessageHolder<TMessage> TDummyRsm::Read(TMessageHolder<TCommandRequest> message
}
}

void TDummyRsm::Write(TMessageHolder<TLogEntry> message)
void TDummyRsm::Write(TMessageHolder<TLogEntry> message, uint64_t index)
{
Log.emplace_back(std::move(message));
if (LastAppliedIndex < index) {
Log.emplace_back(std::move(message));
LastAppliedIndex = index;
}
}

TMessageHolder<TLogEntry> TDummyRsm::Prepare(TMessageHolder<TCommandRequest> command, uint64_t term)
Expand Down Expand Up @@ -381,7 +384,7 @@ void TRaft::Process(ITimeSource::Time now, TMessageHolder<TMessage> message, con
void TRaft::ProcessCommitted() {
auto commitIndex = VolatileState->CommitIndex;
for (auto i = VolatileState->LastApplied+1; i <= commitIndex; i++) {
Rsm->Write(State->Log[i-1]);
Rsm->Write(State->Log[i-1], i);
}
VolatileState->LastApplied = commitIndex;
}
Expand Down
5 changes: 3 additions & 2 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ struct INode {
struct IRsm {
virtual ~IRsm() = default;
virtual TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message, uint64_t index) = 0;
virtual void Write(TMessageHolder<TLogEntry> message) = 0;
virtual void Write(TMessageHolder<TLogEntry> message, uint64_t index) = 0;
virtual TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) = 0;
};

struct TDummyRsm: public IRsm {
TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message, uint64_t index) override;
void Write(TMessageHolder<TLogEntry> message) override;
void Write(TMessageHolder<TLogEntry> message, uint64_t index) override;
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) override;

private:
uint64_t LastAppliedIndex;
std::vector<TMessageHolder<TLogEntry>> Log;
};

Expand Down

0 comments on commit 51e2683

Please sign in to comment.