Skip to content

Commit

Permalink
Polish
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Dec 16, 2023
1 parent 79b032a commit 51ccd3b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
37 changes: 19 additions & 18 deletions src/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,20 @@ void TRaft::OnAppendEntries(TMessageHolder<TAppendEntriesResponse> message) {
}
}

void TRaft::OnCommandRequest(TMessageHolder<TCommandRequest> command, const std::shared_ptr<INode>& replyTo) {
auto& log = State->Log;
auto dataSize = command->Len - sizeof(TCommandRequest);
auto entry = NewHoldedMessage<TLogEntry>(sizeof(TLogEntry)+dataSize);
memcpy(entry->Data, command->Data, dataSize);
entry->Term = State->CurrentTerm;
log.push_back(entry);
auto index = log.size()-1;
if (replyTo) {
auto mes = NewHoldedMessage(TCommandResponse {.Index = index});
waiting.emplace(TWaiting{mes->Index, mes, replyTo});
}
}

TMessageHolder<TRequestVoteRequest> TRaft::CreateVote(uint32_t nodeId) {
auto mes = NewHoldedMessage(
TMessageEx {.Src = Id, .Dst = nodeId, .Term = State->CurrentTerm},
Expand Down Expand Up @@ -277,7 +291,7 @@ void TRaft::Follower(ITimeSource::Time now, TMessageHolder<TMessage> message) {
if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) {
OnRequestVote(now, std::move(maybeRequestVote.Cast()));
} else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) {
OnAppendEntries(now, maybeAppendEntries.Cast());
OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
}
}

Expand All @@ -287,32 +301,19 @@ void TRaft::Candidate(ITimeSource::Time now, TMessageHolder<TMessage> message) {
} else if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) {
OnRequestVote(now, std::move(maybeRequestVote.Cast()));
} else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) {
OnAppendEntries(now, maybeAppendEntries.Cast());
OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
}
}

void TRaft::Leader(ITimeSource::Time now, TMessageHolder<TMessage> message, const std::shared_ptr<INode>& replyTo) {
if (auto maybeAppendEntries = message.Maybe<TAppendEntriesResponse>()) {
OnAppendEntries(maybeAppendEntries.Cast());
OnAppendEntries(std::move(maybeAppendEntries.Cast()));
} else if (auto maybeCommandRequest = message.Maybe<TCommandRequest>()) {
auto command = maybeCommandRequest.Cast();
auto& log = State->Log;
auto dataSize = command->Len - sizeof(TCommandRequest);
auto entry = NewHoldedMessage<TLogEntry>(sizeof(TLogEntry)+dataSize);
memcpy(entry->Data, command->Data, dataSize);
entry->Term = State->CurrentTerm;
log.push_back(entry);
auto index = log.size()-1;
if (replyTo) {
auto mes = NewHoldedMessage(TCommandResponse {.Index = index});
waiting.emplace(TWaiting{mes->Index, mes, replyTo});
}
OnCommandRequest(std::move(maybeCommandRequest.Cast()), replyTo);
} else if (auto maybeVoteRequest = message.Maybe<TRequestVoteRequest>()) {
OnRequestVote(now, std::move(maybeVoteRequest.Cast()));
} else if (auto maybeVoteResponse = message.Maybe<TRequestVoteResponse>()) {
// skip additional votes
} else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) {
OnAppendEntries(now, maybeAppendEntries.Cast());
OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
}
}

Expand Down
1 change: 1 addition & 0 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class TRaft {
void OnRequestVote(TMessageHolder<TRequestVoteResponse> message);
void OnAppendEntries(ITimeSource::Time now, TMessageHolder<TAppendEntriesRequest> message);
void OnAppendEntries(TMessageHolder<TAppendEntriesResponse> message);
void OnCommandRequest(TMessageHolder<TCommandRequest> message, const std::shared_ptr<INode>& replyTo);

void LeaderTimeout(ITimeSource::Time now);
void CandidateTimeout(ITimeSource::Time now);
Expand Down

0 comments on commit 51ccd3b

Please sign in to comment.