diff --git a/src/raft.cpp b/src/raft.cpp index 8be6064..d4f26d6 100644 --- a/src/raft.cpp +++ b/src/raft.cpp @@ -253,12 +253,9 @@ void TRaft::OnAppendEntries(TMessageHolder message) { void TRaft::OnCommandRequest(TMessageHolder command, const std::shared_ptr& replyTo) { auto& log = State->Log; - auto dataSize = command->Len - sizeof(TCommandRequest); - auto entry = NewHoldedMessage(sizeof(TLogEntry)+dataSize); - memcpy(entry->Data, command->Data, dataSize); - entry->Term = State->CurrentTerm; - log.push_back(entry); - auto index = log.size()-1; + auto entry = Rsm->Prepare(std::move(command), State->CurrentTerm); + log.emplace_back(std::move(entry)); + auto index = log.size(); if (replyTo) { auto mes = NewHoldedMessage(TCommandResponse {.Index = index}); waiting.emplace(TWaiting{mes->Index, mes, replyTo}); @@ -368,9 +365,19 @@ void TRaft::Process(ITimeSource::Time now, TMessageHolder message, con } } -void TRaft::ProcessWaiting() { +void TRaft::ProcessCommitted() { auto commitIndex = VolatileState->CommitIndex; - while (!waiting.empty() && waiting.top().Index <= commitIndex) { + if (commitIndex > 0) { + for (auto i = VolatileState->LastApplied; i <= commitIndex; i++) { + Rsm->Write(State->Log[i-1]); + } + VolatileState->LastApplied = commitIndex; + } +} + +void TRaft::ProcessWaiting() { + auto lastApplied = VolatileState->LastApplied; + while (!waiting.empty() && waiting.top().Index <= lastApplied) { auto w = waiting.top(); waiting.pop(); w.ReplyTo->Send(std::move(w.Message)); } @@ -403,6 +410,7 @@ void TRaft::LeaderTimeout(ITimeSource::Time now) { } } + ProcessCommitted(); ProcessWaiting(); } diff --git a/src/raft.h b/src/raft.h index 1e91471..682ffbf 100644 --- a/src/raft.h +++ b/src/raft.h @@ -143,6 +143,7 @@ class TRaft { TMessageHolder CreateVote(uint32_t nodeId); TMessageHolder CreateAppendEntries(uint32_t nodeId); + void ProcessCommitted(); void ProcessWaiting(); ITimeSource::Time MakeElection(ITimeSource::Time now);