Skip to content

Commit

Permalink
Implementing Flexible Raft with NWR (#1017)
Browse files Browse the repository at this point in the history
* feat(iterator):Auto commit mode for applying log iterator

* feat(iterator):Auto commit mode for applying log iterator

* feat(iterator):Correction parameter name

* feat(iterator):mvn compile

* feat(iterator):Modify comments and annotations

* feat(iterator):Modify comments

* feat(iterator):Modify comments

* feat(rfcs):Add rfcs for Flexible-Raft-with-NWR

* feat(rfcs):Add stepDown change logic and modify parameter name for rfcs

* feat(rfcs):Modify factor calculation rules for rfcs

* feat(rfcs):Modify some language expressions

* feat(rfcs):Implementing Flexible Raft with NWR

* feat(rfcs):compile project

* feat(rfcs):compile project

* feat(rfcs):Remove useless and redundant code

* feat(rfcs):Add rfcs

* feat(flexible):add flexible mode fot Jraft

* feat(flexible):Modify rfc doc

* feat(flexible):Modify quorum code design

* feat(factor):Add ResetFactor API

* feat(flexible):compile code

* feat(flexible):modify code

* feat(flexible):modify code

* feat(flexible):compile code

* fix(flexible):modify raft.desc

* fix(flexible):modify MarshallerHelper

* fix(flexible):modify MarshallerHelper

* fix(flexible):modify RAFT.DESC

* fix(flexible):Modifying comments and code formats

* fix(flexible):Added factor persistence for log storage and Adapt to write heartbeat for w<r situations

* fix(flexible):compile code

* fix(flexible):modify test

* fix(flexible):modify test

* fix(flexible):modify code with proto version changing

* fix(flexible):modify code with proto version changing

* fix(flexible):modify code format

* fix(flexible):modify code format

* fix(flexible):change protoc version from 2.6.1 to 3.5.1

* fix(flexible):modify Class RaftError

* fix(flexible):modify Class format

* fix(flexible):modify Class format

* fix(flexible):modify Class format

* fix(flexible):change int64 to int32 in proto

* fix(flexible):change int64 to int32 in proto

* fix(flexible):avoid importing *

* fix(flexible):change comment

* fix(flexible):add this for voteCtx and preVoteCtx

* fix(flexible):change import order

* fix(flexible):change import order

* feat(flexible):Place attributes such as quorum, factor, and isEnableFlexible into Configuration

* feat(flexible):check format

* feat(flexible):Add test code for resetFactor API

* feat(flexible):modify code format

* feat(flexible):enable flexible check

* feat(flexible):comment useless code

* feat(flexible):modify code

* feat(flexible):Persisting the quorum attribute

* fix(flexible):Delete redundant haveFactor judgments

* fix(flexible):Add necessary null judgment

* fix(flexible):remove config null parameter

* fix(flexible):modify code

* fix(flexible):modify some logic mistakes、remove unused method and add BallotTest

* fix(flexible):modify config check

* fix(flexible):modify config check

* fix(flexible):remove Redundant code

* fix(flexible):remove some check and add NodeOptions.setFactor method

* fix(flexible):remove redundant code

* fix(flexible):解决majority模式下,空指针异常问题

* fix(flexible):修改部分set quorum逻辑,

* feature(flexible):添加flexible raft 测试模块

* feature(flexible):check code format

* feature(flexible):remove code

* feature(flexible):remove redundant blank lines

* feature(flexible):modify test module

* fix(LogManagerTest):modify code format

* fix(LogManagerTest):modify code format

* fix(LogManagerTest):modify code format

* feature(flexible):modify test module in core

* fix(test_core):modify test—core code

* fix(test_module):modify test_core

* fix(test_module):modify testTripleNodesV1V2Codec temporarily

* fix(test_module):modify testTripleNodesV1V2Codec temporarily

* fix(test_module):modify testTripleNodesV1V2Codec temporarily

* fix(test_module):modify testTripleNodesV1V2Codec temporarily

* fix(test_module):modify V1V2Codec error

* fix(test_module):modify V1V2Codec error

* fix(test_module):modify V1V2Codec error

* fix(test_module):modify resetPeers error

* fix(test_module):modify changePeerChaps test error

* fix(test_module):modify v1decoder error

* fix(test_module):modify v1decoder error

* fix(test_module):modify storage test

* fix(test_module):modify storage test

* fix(test_module):modify node test

* fix(test_module):modify node test

* modify code

* modify code

* modify code

---------

Co-authored-by: yuanziwei <[email protected]>
  • Loading branch information
1294566108 and yuanziwei authored Oct 30, 2023
1 parent 22ecd56 commit 49d8842
Show file tree
Hide file tree
Showing 68 changed files with 9,748 additions and 275 deletions.
12 changes: 12 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/CliService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
*/
public interface CliService extends Lifecycle<CliOptions> {

/**
* Reset the size of the read or write factor of a raft group in flexible mode
*
* @param groupId the raft group id
* @param conf current configuration
* @param readFactor read factor in flexible raft mode
* @param writeFactor write factor in flexible raft mode
* @return operation status
*/
Status resetFactor(final String groupId, final Configuration conf, final Integer readFactor,
final Integer writeFactor);

/**
* Add a new peer into the replicating group which consists of |conf|.
* return OK status when success.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.entity.BallotFactory;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.BootstrapOptions;
import com.alipay.sofa.jraft.util.Endpoint;
Expand Down Expand Up @@ -112,6 +113,9 @@ public static Configuration getConfiguration(final String s) {
return conf;
}
if (conf.parse(s)) {
conf.setEnableFlexible(false);
Quorum quorum = BallotFactory.buildMajorityQuorum(conf.size());
conf.setQuorum(quorum);
return conf;
}
throw new IllegalArgumentException("Invalid conf str:" + s);
Expand Down
12 changes: 12 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
void changePeers(final Configuration newPeers, final Closure done);

/**
* This method can be called to reset the size of the read or write factor
*
* It should be noted that we cannot change the factory size while changing
* the number of cluster nodes.
*
* @param readFactor read factor for flexible raft
* @param writeFactor write factor for flexible raft
* @since 1.3.14
*/
void resetFactor(final Integer readFactor, final Integer writeFactor, final Closure done);

/**
* Reset the configuration of this node individually, without any replication
* to other peers before this node becomes the leader. This function is
Expand Down
67 changes: 67 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Quorum.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft;

/**
* @author Akai
*/
public class Quorum {
private int w;

private int r;

public Quorum(int w, int r) {
this.w = w;
this.r = r;
}

public int getW() {
return w;
}

public void setW(int w) {
this.w = w;
}

public int getR() {
return r;
}

public void setR(int r) {
this.r = r;
}

@Override
public String toString() {
return "Quorum{w=" + w + ", r=" + r + '}';
}

@Override
public int hashCode() {
return super.hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof Quorum) {
Quorum quorum = (Quorum) obj;
return quorum.getR() == this.getR() && quorum.getW() == this.getW();
}
return false;
}

}
108 changes: 94 additions & 14 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/conf/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,43 @@
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.Quorum;
import com.alipay.sofa.jraft.util.Copiable;
import com.alipay.sofa.jraft.util.Requires;

/**
* A configuration with a set of peers.
* @author boyan ([email protected])
* @author Akai
*
* 2018-Mar-15 11:00:26 AM
*/
public class Configuration implements Iterable<PeerId>, Copiable<Configuration> {

private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);

private static final String LEARNER_POSTFIX = "/learner";
private static final String LEARNER_POSTFIX = "/learner";

private List<PeerId> peers = new ArrayList<>();
private Quorum quorum;

private Integer readFactor;

private Integer writeFactor;

private Boolean isEnableFlexible = false;

private List<PeerId> peers = new ArrayList<>();

// use LinkedHashSet to keep insertion order.
private LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
private LinkedHashSet<PeerId> learners = new LinkedHashSet<>();

public Configuration() {
super();
Expand All @@ -68,16 +79,34 @@ public Configuration(final Iterable<PeerId> conf) {
* @param conf configuration
*/
public Configuration(final Configuration conf) {
this(conf.getPeers(), conf.getLearners());
this(conf.getPeers(), conf.getLearners(), conf.getQuorum(), conf.getReadFactor(), conf.getWriteFactor(), conf
.isEnableFlexible());
}

/**
* Construct a Configuration instance with peers and learners.
*
* @param conf peers configuration
* @param learners learners
* @since 1.3.0
* @param conf peers configuration
* @param learners learners
* @param quorum quorum
* @param readFactor read factor
* @param writeFactor write factor
* @param isEnableFlexible enable flexible mode or not
* @since 1.3.14
*/
public Configuration(final Iterable<PeerId> conf, final Iterable<PeerId> learners, final Quorum quorum,
final Integer readFactor, final Integer writeFactor, final Boolean isEnableFlexible) {
Requires.requireNonNull(conf, "conf");
for (final PeerId peer : conf) {
this.peers.add(peer.copy());
}
addLearners(learners);
this.quorum = quorum;
this.readFactor = readFactor;
this.writeFactor = writeFactor;
this.isEnableFlexible = isEnableFlexible;
}

public Configuration(final Iterable<PeerId> conf, final Iterable<PeerId> learners) {
Requires.requireNonNull(conf, "conf");
for (final PeerId peer : conf) {
Expand All @@ -86,6 +115,38 @@ public Configuration(final Iterable<PeerId> conf, final Iterable<PeerId> learner
addLearners(learners);
}

public Integer getReadFactor() {
return readFactor;
}

public void setReadFactor(Integer readFactor) {
this.readFactor = readFactor;
}

public Integer getWriteFactor() {
return writeFactor;
}

public void setWriteFactor(Integer writeFactor) {
this.writeFactor = writeFactor;
}

public Quorum getQuorum() {
return this.quorum;
}

public void setQuorum(Quorum quorum) {
this.quorum = quorum;
}

public Boolean isEnableFlexible() {
return isEnableFlexible;
}

public void setEnableFlexible(Boolean enableFlexible) {
isEnableFlexible = enableFlexible;
}

public void setLearners(final LinkedHashSet<PeerId> learners) {
this.learners = learners;
}
Expand Down Expand Up @@ -148,7 +209,7 @@ public List<PeerId> listLearners() {

@Override
public Configuration copy() {
return new Configuration(this.peers, this.learners);
return new Configuration(this);
}

/**
Expand Down Expand Up @@ -251,12 +312,32 @@ public boolean equals(final Object obj) {
if (this.peers == null) {
return other.peers == null;
} else {
return this.peers.equals(other.peers);
return this.peers.equals(other.peers) && Objects.equals(this.quorum, other.quorum)
&& Objects.equals(this.readFactor, other.readFactor)
&& Objects.equals(this.writeFactor, other.writeFactor)
&& Objects.equals(this.isEnableFlexible, other.isEnableFlexible);
}
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder(toBasicString());

if (Objects.nonNull(isEnableFlexible) && !isEmpty()) {
sb.append(",isEnableFlexible:").append(isEnableFlexible);
}

if (Objects.nonNull(readFactor) || Objects.nonNull(writeFactor)) {
sb.append(",readFactor:").append(readFactor).append(",writeFactor:").append(writeFactor);
}

if (Objects.nonNull(quorum)) {
sb.append(",quorum:").append(quorum);
}
return sb.toString();
}

public String toBasicString() {
final StringBuilder sb = new StringBuilder();
final List<PeerId> peers = listPeers();
int i = 0;
Expand All @@ -278,7 +359,6 @@ public String toString() {
}
i++;
}

return sb.toString();
}

Expand Down Expand Up @@ -311,9 +391,9 @@ public boolean parse(final String conf) {
}

/**
* Get the difference between |*this| and |rhs|
* |included| would be assigned to |*this| - |rhs|
* |excluded| would be assigned to |rhs| - |*this|
* Get the difference between |*this| and |rhs|
* |included| would be assigned to |*this| - |rhs|
* |excluded| would be assigned to |rhs| - |*this|
*/
public void diff(final Configuration rhs, final Configuration included, final Configuration excluded) {
included.peers = new ArrayList<>(this.peers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import javax.annotation.concurrent.ThreadSafe;

import com.alipay.sofa.jraft.entity.Ballot;
import com.alipay.sofa.jraft.entity.PeerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,8 +30,6 @@
import com.alipay.sofa.jraft.Lifecycle;
import com.alipay.sofa.jraft.closure.ClosureQueue;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.Ballot;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.BallotBoxOptions;
import com.alipay.sofa.jraft.util.Describer;
import com.alipay.sofa.jraft.util.OnlyForTest;
Expand Down Expand Up @@ -114,12 +114,13 @@ public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final
final long startAt = Math.max(this.pendingIndex, firstLogIndex);
Ballot.PosHint hint = new Ballot.PosHint();
for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
hint = bl.grant(peer, hint);
if (bl.isGranted()) {
final Ballot ballot = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
hint = ballot.grant(peer, hint);
if (ballot.isGranted()) {
lastCommittedIndex = logIndex;
}
}

if (lastCommittedIndex == 0) {
return true;
}
Expand Down Expand Up @@ -198,8 +199,8 @@ public boolean resetPendingIndex(final long newPendingIndex) {
* @return returns true on success
*/
public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) {
final Ballot bl = new Ballot();
if (!bl.init(conf, oldConf)) {
final Ballot ballot = new Ballot();
if (!ballot.init(conf, oldConf)) {
LOG.error("Fail to init ballot.");
return false;
}
Expand All @@ -209,7 +210,7 @@ public boolean appendPendingTask(final Configuration conf, final Configuration o
LOG.error("Node {} fail to appendingTask, pendingIndex={}.", this.opts.getNodeId(), this.pendingIndex);
return false;
}
this.pendingMetaQueue.add(bl);
this.pendingMetaQueue.add(ballot);
this.closureQueue.appendPendingClosure(done);
return true;
} finally {
Expand Down
Loading

0 comments on commit 49d8842

Please sign in to comment.