Skip to content

Commit

Permalink
[ISSUE openmessaging#144] node abnormal status detection and recovery
Browse files Browse the repository at this point in the history
Signed-off-by: zhangyang21 <[email protected]>
  • Loading branch information
Git-Yang committed Apr 29, 2022
1 parent eb1b2e2 commit 41872e0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -111,6 +112,13 @@ public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest req
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_MEMBER.getCode()));
}

if (memberState.isCandidate() && request.isNeedCheckMemberState()) {
logger.warn("[CHECK_MEMBER_STATE] [HandleHeartBeat] remoteId={} need check member state", request.getLeaderId());
if (request.getTerm() < memberState.currTerm()) {
memberState.recoveryToFollower(request.getTerm(), request.getLeaderId());
}
}

if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) {
Expand Down Expand Up @@ -283,10 +291,12 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
break;
}

if (x.getCode() == DLedgerResponseCode.NETWORK_ERROR.getCode())
if (x.getCode() == DLedgerResponseCode.NETWORK_ERROR.getCode()) {
memberState.getPeersLiveTable().put(id, Boolean.FALSE);
else
} else {
memberState.getPeersLiveTable().put(id, Boolean.TRUE);
memberState.getPeersTermTable().put(id, x.getTerm());
}

if (memberState.isQuorum(succNum.get())
|| memberState.isQuorum(succNum.get() + notReadyNum.get())) {
Expand All @@ -305,6 +315,7 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);
if (memberState.isQuorum(succNum.get())) {
lastSuccHeartBeatTime = System.currentTimeMillis();
checkPeersTermTable();
} else {
logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
Expand All @@ -320,6 +331,28 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
}
}

private void checkPeersTermTable() throws Exception {
if (memberState.getSelfId().equals(memberState.getLeaderId())) {
long leaderTerm = memberState.getPeersTermTable().getOrDefault(memberState.getLeaderId(), -1L);
for (Map.Entry<String, Long> entryTerm : memberState.getPeersTermTable().entrySet()) {
if (entryTerm.getKey().equals(memberState.getSelfId())) {
continue;
}

if (entryTerm.getValue() > leaderTerm) {
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
heartBeatRequest.setGroup(memberState.getGroup());
heartBeatRequest.setLocalId(memberState.getSelfId());
heartBeatRequest.setRemoteId(memberState.getSelfId());
heartBeatRequest.setLeaderId(memberState.getLeaderId());
heartBeatRequest.setNeedCheckMemberState(true);
heartBeatRequest.setTerm(leaderTerm);
dLedgerRpcService.heartBeat(heartBeatRequest);
}
}
}
}

private void maintainAsLeader() throws Exception {
if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {
long term;
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/openmessaging/storage/dledger/MemberState.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class MemberState {
private long knownMaxTermInGroup = -1;
private Map<String, String> peerMap = new HashMap<>();
private Map<String, Boolean> peersLiveTable = new ConcurrentHashMap<>();
private Map<String, Long> peersTermTable = new HashMap<>();

private volatile String transferee;
private volatile long termToTakeLeadership = -1;
Expand Down Expand Up @@ -132,6 +133,7 @@ public synchronized void changeToLeader(long term) {
this.role = LEADER;
this.leaderId = selfId;
peersLiveTable.clear();
peersTermTable.clear();
}

public synchronized void changeToFollower(long term, String leaderId) {
Expand All @@ -153,6 +155,13 @@ public synchronized void changeToCandidate(long term) {
transferee = null;
}

public synchronized void recoveryToFollower(long term, String leaderId) {
this.role = FOLLOWER;
this.leaderId = leaderId;
this.currTerm = term;
transferee = null;
}

public String getTransferee() {
return transferee;
}
Expand Down Expand Up @@ -226,6 +235,10 @@ public Map<String, Boolean> getPeersLiveTable() {
return peersLiveTable;
}

public Map<String, Long> getPeersTermTable() {
return peersTermTable;
}

//just for test
public void setCurrTermForTest(long term) {
PreConditions.check(term >= currTerm, DLedgerResponseCode.ILLEGAL_MEMBER_STATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@

public class HeartBeatRequest extends RequestOrResponse {

private boolean needCheckMemberState = false;

public boolean isNeedCheckMemberState() {
return needCheckMemberState;
}

public void setNeedCheckMemberState(boolean needCheckMemberState) {
this.needCheckMemberState = needCheckMemberState;
}
}

0 comments on commit 41872e0

Please sign in to comment.