Skip to content

Commit

Permalink
Fix findbugs & build proto only if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Dec 13, 2023
1 parent 4a1ec94 commit 4f03550
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,35 @@

/** Caching the commit information. */
class CommitInfoCache {
static Long max(Long oldCommitIndex, long newCommitIndex) {
return oldCommitIndex == null || newCommitIndex > oldCommitIndex ? newCommitIndex : oldCommitIndex;
}

static BiFunction<RaftPeerId, Long, Long> remapping(long newCommitIndex) {
return (id, oldCommitIndex) -> max(oldCommitIndex, newCommitIndex);
return (id, oldCommitIndex) -> oldCommitIndex == null || newCommitIndex > oldCommitIndex ?
newCommitIndex : oldCommitIndex;
}

private final ConcurrentMap<RaftPeerId, Long> map = new ConcurrentHashMap<>();

CommitInfoProto get(RaftPeer peer) {
return ProtoUtils.toCommitInfoProto(peer, get(peer.getId()));
Optional<Long> get(RaftPeerId id) {
return Optional.ofNullable(map.get(id));
}

long get(RaftPeerId id) {
return Optional.ofNullable(map.get(id)).orElse(0L);
long getNonNull(RaftPeerId id) {
return get(id).orElse(0L);
}

CommitInfoProto update(RaftPeer peer, long newCommitIndex) {
Objects.requireNonNull(peer, "peer == null");
final long computed = map.compute(peer.getId(), remapping(newCommitIndex));
return ProtoUtils.toCommitInfoProto(peer, computed);
final long updated = update(peer.getId(), newCommitIndex);
return ProtoUtils.toCommitInfoProto(peer, updated);
}

long update(RaftPeerId id, long newCommitIndex) {
Objects.requireNonNull(id, "peer == null");
return map.compute(id, remapping(newCommitIndex));
}

void update(CommitInfoProto newInfo) {
final long newCommitIndex = newInfo.getCommitIndex();
map.compute(RaftPeerId.valueOf(newInfo.getServer().getId()), remapping(newCommitIndex));
final RaftPeerId id = RaftPeerId.valueOf(newInfo.getServer().getId());
update(id, newInfo.getCommitIndex());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
Expand All @@ -31,17 +34,29 @@
* entries.
*/
public class ConfigurationManager {
private final RaftPeerId id;
private final RaftConfigurationImpl initialConf;
private final NavigableMap<Long, RaftConfigurationImpl> configurations = new TreeMap<>();
/**
* The current raft configuration. If configurations is not empty, should be
* the last entry of the map. Otherwise is initialConf.
*/
private volatile RaftConfigurationImpl currentConf;
/** Cache the peer corresponding to {@link #id}. */
private volatile RaftPeer currentPeer;

ConfigurationManager(RaftConfigurationImpl initialConf) {
ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
this.id = id;
this.initialConf = initialConf;
this.currentConf = initialConf;
setCurrentConf(initialConf);
}

private void setCurrentConf(RaftConfigurationImpl currentConf) {
this.currentConf = currentConf;
final RaftPeer peer = currentConf.getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
if (peer != null) {
this.currentPeer = peer;
}
}

synchronized void addConfiguration(RaftConfiguration conf) {
Expand All @@ -57,29 +72,32 @@ synchronized void addConfiguration(RaftConfiguration conf) {
private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf) {
configurations.put(logIndex, conf);
if (logIndex == configurations.lastEntry().getKey()) {
currentConf = conf;
setCurrentConf(conf);
}
}

RaftConfigurationImpl getCurrent() {
return currentConf;
}

RaftPeer getCurrentPeer() {
return currentPeer;
}

/**
* Remove all the configurations whose log index is >= the given index.
*
* @param index The given index. All the configurations whose log index is >=
* this value will be removed.
* @return The configuration with largest log index < the given index.
*/
synchronized RaftConfiguration removeConfigurations(long index) {
synchronized void removeConfigurations(long index) {
// remove all configurations starting at the index
for(final Iterator<?> iter = configurations.tailMap(index).entrySet().iterator(); iter.hasNext();) {
iter.next();
iter.remove();
final SortedMap<Long, RaftConfigurationImpl> tail = configurations.tailMap(index);
if (tail.isEmpty()) {
return;
}
currentConf = configurations.isEmpty() ? initialConf :
configurations.lastEntry().getValue();
return currentConf;
tail.clear();
setCurrentConf(configurations.isEmpty() ? initialConf : configurations.lastEntry().getValue());
}

synchronized int numOfConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public long[] getFollowerNextIndices() {
this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
getMemberId(), commitInfoCache::get, retryCache::getStatistics);
getMemberId(), commitInfoCache::getNonNull, retryCache::getStatistics);

this.startComplete = new AtomicBoolean(false);
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());
Expand Down Expand Up @@ -452,6 +452,12 @@ public RaftGroupMemberId getMemberId() {
return getState().getMemberId();
}

@Override
public RaftPeer getPeer() {
return Optional.ofNullable(getState().getCurrentPeer())
.orElseGet(() -> getRaftServer().getPeer());
}

@Override
public DivisionInfo getInfo() {
return info;
Expand Down Expand Up @@ -622,7 +628,8 @@ synchronized void changeToLeader() {
public Collection<CommitInfoProto> getCommitInfos() {
final List<CommitInfoProto> infos = new ArrayList<>();
// add the commit info of this server
infos.add(updateCommitInfoCache());
final long commitIndex = updateCommitInfoCache();
infos.add(ProtoUtils.toCommitInfoProto(getPeer(), commitIndex));

// add the commit infos of other servers
if (getInfo().isLeader()) {
Expand All @@ -634,7 +641,9 @@ public Collection<CommitInfoProto> getCommitInfos() {
raftConf.getAllPeers(RaftPeerRole.FOLLOWER).stream(),
raftConf.getAllPeers(RaftPeerRole.LISTENER).stream())
.filter(peer -> !peer.getId().equals(getId()))
.map(commitInfoCache::get)
.map(peer -> commitInfoCache.get(peer.getId())
.map(index -> ProtoUtils.toCommitInfoProto(peer, index))
.orElse(null))
.filter(Objects::nonNull)
.forEach(infos::add);
}
Expand Down Expand Up @@ -1533,8 +1542,8 @@ private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupI
}
}

private CommitInfoProto updateCommitInfoCache() {
return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex());
private long updateCommitInfoCache() {
return commitInfoCache.update(getId(), state.getLog().getLastCommittedIndex());
}

ExecutorService getServerExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ServerState {
final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder()
.setConf(followerPeers, listenerPeers)
.build();
configurationManager = new ConfigurationManager(initialConf);
configurationManager = new ConfigurationManager(id, initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);

final String storageDirName = group.getGroupId().getUuid().toString();
Expand Down Expand Up @@ -196,6 +196,10 @@ RaftConfigurationImpl getRaftConf() {
return configurationManager.getCurrent();
}

RaftPeer getCurrentPeer() {
return configurationManager.getCurrentPeer();
}

long getCurrentTerm() {
return currentTerm.get();
}
Expand Down

0 comments on commit 4f03550

Please sign in to comment.