Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 添加节点参与投票异常监听器 #1100

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,32 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
List<Replicator.ReplicatorStateListener> getReplicatorStatueListeners();

/**
* SOFAJRaft users can implement the NodeStateListener interface by themselves.
* So users can do their own logical operator in this listener when node error, destroyed or had some errors.
*
* @param nodeStateListener added NodeStateListener which is implemented by users.
*/
void addNodeStateListener(final Node.NodeStateListener nodeStateListener);

/**
* End User can remove their implement the NodeStateListener interface by themselves.
*
* @param nodeStateListener need to remove the NodeStateListener which has been added by users.
*/
void removeNodeStateListener(final Node.NodeStateListener nodeStateListener);

/**
* Remove all the NodeStateListener which have been added by users.
*/
void clearNodeStateListener();

/**
* Get the NodeStateListener which have been added by users.
* @return node's nodeStateListener which have been added by users.
*/
List<Node.NodeStateListener> getNodeStateListeners();

/**
* Get the node's target election priority value.
*
Expand Down Expand Up @@ -380,4 +406,35 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
* @since 1.3.12
*/
long getLastAppliedLogIndex();

/**
* User can implement the NodeStateListener interface by themselves.
* So they can do some their own logic codes when node error, destroyed or had some errors.
*
* @author zxuanhong
*
* 2024-04-10 09:23
*/
interface NodeStateListener {

/**
* Called when this can't do preVote as it is not in conf
*
* @param nodeId current node id
* @param options current node options
*/
default void peerNotInConf(NodeId nodeId, NodeOptions options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate parameters in peerNotInConf to ensure they are not null before use.

+ if (nodeId == null || options == null) {
+     throw new IllegalArgumentException("nodeId and options cannot be null");
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
default void peerNotInConf(NodeId nodeId, NodeOptions options) {
default void peerNotInConf(NodeId nodeId, NodeOptions options) {
if (nodeId == null || options == null) {
throw new IllegalArgumentException("nodeId and options cannot be null");
}


}

/**
* Called when this PreVoteResponse no voting granted.
*
* @param nodeId current node id
* @param options current node options
*/
default void noVotingGranted(NodeId nodeId, NodeOptions options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate parameters in noVotingGranted to ensure they are not null before use.

+ if (nodeId == null || options == null) {
+     throw new IllegalArgumentException("nodeId and options cannot be null");
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
default void noVotingGranted(NodeId nodeId, NodeOptions options) {
default void noVotingGranted(NodeId nodeId, NodeOptions options) {
if (nodeId == null || options == null) {
throw new IllegalArgumentException("nodeId and options cannot be null");
}


}
}
}
39 changes: 34 additions & 5 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.stream.Collectors;

import com.alipay.sofa.jraft.rpc.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,9 +82,6 @@
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
import com.alipay.sofa.jraft.option.SnapshotExecutorOptions;
import com.alipay.sofa.jraft.rpc.RaftClientService;
import com.alipay.sofa.jraft.rpc.RaftServerService;
import com.alipay.sofa.jraft.rpc.RpcRequestClosure;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesResponse;
import com.alipay.sofa.jraft.rpc.RpcRequests.InstallSnapshotRequest;
Expand All @@ -94,8 +92,6 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.RequestVoteResponse;
import com.alipay.sofa.jraft.rpc.RpcRequests.TimeoutNowRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.TimeoutNowResponse;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.rpc.impl.core.DefaultRaftClientService;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.storage.LogStorage;
Expand Down Expand Up @@ -220,6 +216,10 @@ public class NodeImpl implements Node, RaftServerService {

/** ReplicatorStateListeners */
private final CopyOnWriteArrayList<Replicator.ReplicatorStateListener> replicatorStateListeners = new CopyOnWriteArrayList<>();

/** nodeStateListeners */
private final CopyOnWriteArrayList<Node.NodeStateListener> nodeStateListeners = new CopyOnWriteArrayList<>();

/** Node's target leader election priority value */
private volatile int targetPriority;
/** The number of elections time out for current node */
Expand Down Expand Up @@ -2628,6 +2628,10 @@ public void handlePreVoteResponse(final PeerId peerId, final long term, final Re
doUnlock = false;
electSelf();
}
}else{
for(Node.NodeStateListener listener : this.nodeStateListeners) {
RpcUtils.runInThread(() -> listener.noVotingGranted(getNodeId(), options));
}
Comment on lines +2631 to +2634
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a more robust thread handling mechanism instead of RpcUtils.runInThread for executing listener callbacks to ensure that any exceptions thrown by the listeners do not affect the node's stability.

}
} finally {
if (doUnlock) {
Expand Down Expand Up @@ -2674,6 +2678,9 @@ private void preVote() {
}
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
for(Node.NodeStateListener listener : this.nodeStateListeners) {
RpcUtils.runInThread(() -> listener.peerNotInConf(getNodeId(), options));
}
return;
}
oldTerm = this.currTerm;
Expand Down Expand Up @@ -3448,6 +3455,28 @@ public List<Replicator.ReplicatorStateListener> getReplicatorStatueListeners() {
return this.replicatorStateListeners;
}

@Override
public void addNodeStateListener(NodeStateListener nodeStateListener) {
Requires.requireNonNull(nodeStateListener, "nodeStateListener");
this.nodeStateListeners.add(nodeStateListener);
}

@Override
public void removeNodeStateListener(NodeStateListener nodeStateListener) {
Requires.requireNonNull(nodeStateListener, "nodeStateListener");
this.nodeStateListeners.remove(nodeStateListener);
}

@Override
public void clearNodeStateListener() {
this.nodeStateListeners.clear();
}

@Override
public List<NodeStateListener> getNodeStateListeners() {
return this.nodeStateListeners;
}

@Override
public int getNodeTargetPriority() {
return this.targetPriority;
Expand Down
Loading