From 881f47ed0962f632bd78dcf7685a07a65e344ae2 Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Fri, 14 Jun 2024 06:25:17 +0800 Subject: [PATCH] [CELEBORN-1452][0.4] Master follower node metadata is out of sync after installing snapshot ### What changes were proposed in this pull request? backport https://github.com/apache/celeborn/pull/2547 to `branch-0.4` Fix Master follower node metadata is out of sync after installing snapshot ### Why are the changes needed? Follower node metadata is out of sync, when a master-slave switchover occurs, there are major risks to the stability of the cluster. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Closes #2563 from cfmcgrady/CELEBORN-1452-branch-0.4. Lead-authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com> Co-authored-by: Fu Chen Signed-off-by: Shuang --- LICENSE | 3 + .../ha/CelebornStateMachineStorage.java | 212 ++++++++++++ .../master/clustermeta/ha/HAHelper.java | 4 +- .../master/clustermeta/ha/HARaftServer.java | 5 + .../master/clustermeta/ha/StateMachine.java | 19 +- .../ha/MasterStateMachineSuiteJ.java | 303 ++++++++++++++++++ 6 files changed, 527 insertions(+), 19 deletions(-) create mode 100644 master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java diff --git a/LICENSE b/LICENSE index 865b656dacb..547515d12e2 100644 --- a/LICENSE +++ b/LICENSE @@ -246,3 +246,6 @@ Remote Shuffle Service for Flink ./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java ./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java ./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java + +Apache Ratis +./master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java new file mode 100644 index 00000000000..5035a838089 --- /dev/null +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master.clustermeta.ha; + +import static org.apache.ratis.util.MD5FileUtil.MD5_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.ratis.io.MD5Hash; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.SnapshotRetentionPolicy; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.MD5FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Copied from Apache Ratis {@link SimpleStateMachineStorage}, We need to refresh latest snapshot + * after installing snapshot from leader which makes StateMachine load latest snapshot correctly. + */ +public class CelebornStateMachineStorage implements StateMachineStorage { + + private static final Logger LOG = LoggerFactory.getLogger(CelebornStateMachineStorage.class); + + static final String SNAPSHOT_FILE_PREFIX = "snapshot"; + /** snapshot.term_index */ + public static final Pattern SNAPSHOT_REGEX = + Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)"); + + public static final Pattern SNAPSHOT_MD5_REGEX = + Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)" + MD5_SUFFIX); + private static final DirectoryStream.Filter SNAPSHOT_MD5_FILTER = + entry -> + Optional.ofNullable(entry.getFileName()) + .map(Path::toString) + .map(SNAPSHOT_MD5_REGEX::matcher) + .filter(Matcher::matches) + .isPresent(); + + private volatile File stateMachineDir = null; + private final AtomicReference latestSnapshot = new AtomicReference<>(); + + File tmpDir = null; + + @Override + public void init(RaftStorage storage) throws IOException { + this.stateMachineDir = storage.getStorageDir().getStateMachineDir(); + loadLatestSnapshot(); + tmpDir = storage.getStorageDir().getTmpDir(); + } + + @Override + public void format() throws IOException { + // TODO + } + + static List getSingleFileSnapshotInfos(Path dir) throws IOException { + final List infos = new ArrayList<>(); + try (DirectoryStream stream = Files.newDirectoryStream(dir)) { + for (Path path : stream) { + final Path filename = path.getFileName(); + if (filename != null) { + final Matcher matcher = SNAPSHOT_REGEX.matcher(filename.toString()); + if (matcher.matches()) { + final long term = Long.parseLong(matcher.group(1)); + final long index = Long.parseLong(matcher.group(2)); + final FileInfo fileInfo = new FileInfo(path, null); // No FileDigest here. + infos.add(new SingleFileSnapshotInfo(fileInfo, term, index)); + } + } + } + } + return infos; + } + + @Override + public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) + throws IOException { + if (stateMachineDir == null) { + return; + } + + final int numSnapshotsRetained = + Optional.ofNullable(snapshotRetentionPolicy) + .map(SnapshotRetentionPolicy::getNumSnapshotsRetained) + .orElse(SnapshotRetentionPolicy.DEFAULT_ALL_SNAPSHOTS_RETAINED); + if (numSnapshotsRetained <= 0) { + return; + } + + final List allSnapshotFiles = + getSingleFileSnapshotInfos(stateMachineDir.toPath()); + + if (allSnapshotFiles.size() > snapshotRetentionPolicy.getNumSnapshotsRetained()) { + allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed()); + allSnapshotFiles.subList(numSnapshotsRetained, allSnapshotFiles.size()).stream() + .map(SingleFileSnapshotInfo::getFile) + .map(FileInfo::getPath) + .forEach( + snapshotPath -> { + LOG.info("Deleting old snapshot at {}", snapshotPath.toAbsolutePath()); + FileUtils.deletePathQuietly(snapshotPath); + }); + // clean up the md5 files if the corresponding snapshot file does not exist + try (DirectoryStream stream = + Files.newDirectoryStream(stateMachineDir.toPath(), SNAPSHOT_MD5_FILTER)) { + for (Path md5path : stream) { + Path md5FileNamePath = md5path.getFileName(); + if (md5FileNamePath == null) { + continue; + } + final String md5FileName = md5FileNamePath.toString(); + final File snapshotFile = + new File( + stateMachineDir, + md5FileName.substring(0, md5FileName.length() - MD5_SUFFIX.length())); + if (!snapshotFile.exists()) { + FileUtils.deletePathQuietly(md5path); + } + } + } + } + } + + public File getSnapshotFile(long term, long endIndex) { + final File dir = Objects.requireNonNull(stateMachineDir, "stateMachineDir == null"); + return new File(dir, getSnapshotFileName(term, endIndex)); + } + + static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws IOException { + final Iterator i = getSingleFileSnapshotInfos(dir).iterator(); + if (!i.hasNext()) { + return null; + } + + SingleFileSnapshotInfo latest = i.next(); + for (; i.hasNext(); ) { + final SingleFileSnapshotInfo info = i.next(); + if (info.getIndex() > latest.getIndex()) { + latest = info; + } + } + + // read md5 + final Path path = latest.getFile().getPath(); + final MD5Hash md5 = MD5FileUtil.readStoredMd5ForFile(path.toFile()); + final FileInfo info = new FileInfo(path, md5); + return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex()); + } + + public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo info) { + return latestSnapshot.updateAndGet( + previous -> previous == null || info.getIndex() > previous.getIndex() ? info : previous); + } + + public static String getSnapshotFileName(long term, long endIndex) { + return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex; + } + + @Override + public SingleFileSnapshotInfo getLatestSnapshot() { + return latestSnapshot.get(); + } + + @Override + public File getTmpDir() { + return tmpDir; + } + + public void loadLatestSnapshot() { + final File dir = stateMachineDir; + if (dir == null) { + return; + } + try { + updateLatestSnapshot(findLatestSnapshot(dir.toPath())); + } catch (IOException ignored) { + } + } +} diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java index b0afab4b137..381e8a21df8 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java @@ -23,7 +23,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.protocol.Message; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.celeborn.common.client.MasterNotLeaderException; @@ -123,7 +122,8 @@ public static Message convertResponseToMessage(ResourceProtos.ResourceResponse r * @return the temporary snapshot file * @throws IOException if error occurred while creating the snapshot file */ - public static File createTempSnapshotFile(SimpleStateMachineStorage storage) throws IOException { + public static File createTempSnapshotFile(CelebornStateMachineStorage storage) + throws IOException { File tempDir = storage.getTmpDir(); if (!tempDir.isDirectory() && !tempDir.mkdir()) { throw new IOException( diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java index 61222549c58..fd00c6fa7e8 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java @@ -555,6 +555,11 @@ public long getAppTimeoutDeadline() { return appTimeoutDeadline; } + @VisibleForTesting + public RaftServer getServer() { + return server; + } + public static class LeaderPeerEndpoints { // the rpcEndpoints Tuple2 (ip:port, host:port) public final Tuple2 rpcEndpoints; diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java index a6be2b3174e..ff43e59c211 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java @@ -45,7 +45,6 @@ import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.ExitUtils; @@ -61,22 +60,7 @@ public class StateMachine extends BaseStateMachine { private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); - private final SimpleStateMachineStorage storage = - new SimpleStateMachineStorage() { - - File tmpDir = null; - - @Override - public void init(RaftStorage storage) throws IOException { - super.init(storage); - tmpDir = storage.getStorageDir().getTmpDir(); - } - - @Override - public File getTmpDir() { - return tmpDir; - } - }; + private final CelebornStateMachineStorage storage = new CelebornStateMachineStorage(); private final HARaftServer masterRatisServer; private RaftGroupId raftGroupId; @@ -114,6 +98,7 @@ public void initialize(RaftServer server, RaftGroupId id, RaftStorage raftStorag public void reinitialize() throws IOException { LOG.info("Reinitializing state machine."); getLifeCycle().compareAndTransition(PAUSED, STARTING); + storage.loadLatestSnapshot(); loadSnapshot(storage.getLatestSnapshot()); getLifeCycle().compareAndTransition(STARTING, RUNNING); } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java index 79fb047c56b..8bd76a4a9d0 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java @@ -19,22 +19,32 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.statemachine.SnapshotInfo; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.AppDiskUsageSnapShot; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.quota.ResourceConsumption; +import org.apache.celeborn.common.rpc.RpcEnv; import org.apache.celeborn.common.util.JavaUtils; +import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.RequestSlotsRequest; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest; @@ -43,6 +53,8 @@ public class MasterStateMachineSuiteJ extends RatisBaseSuiteJ { + private final AtomicLong callerId = new AtomicLong(); + @Test public void testRunCommand() { StateMachine stateMachine = ratisServer.getMasterStateMachine(); @@ -250,4 +262,295 @@ public void testObjSerde() throws IOException, InterruptedException { masterStatusSystem.restoreMetaFromFile(tmpFile); Assert.assertEquals(3, masterStatusSystem.workers.size()); } + + private String getNewReqeustId() { + return MasterClient.encodeRequestId(UUID.randomUUID().toString(), callerId.incrementAndGet()); + } + + private void pauseRaftServer(RaftServer server) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, + ClassNotFoundException { + Method method = server.getClass().getDeclaredMethod("getImpls"); + method.setAccessible(true); + Object serverImpl = ((List) method.invoke(server)).get(0); + + Class privateClass = Class.forName("org.apache.ratis.server.impl.RaftServerImpl"); + Method pauseMethod = privateClass.getDeclaredMethod("pause"); + pauseMethod.setAccessible(true); + pauseMethod.invoke(serverImpl); + } + + private void resumeRaftServer(RaftServer server) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, + ClassNotFoundException { + Method method = server.getClass().getDeclaredMethod("getImpls"); + method.setAccessible(true); + Object serverImpl = ((List) method.invoke(server)).get(0); + + Class privateClass = Class.forName("org.apache.ratis.server.impl.RaftServerImpl"); + Method pauseMethod = privateClass.getDeclaredMethod("resume"); + pauseMethod.setAccessible(true); + pauseMethod.invoke(serverImpl); + } + + public List startRaftServers() throws IOException, InterruptedException { + CelebornConf conf1 = new CelebornConf(); + CelebornConf conf2 = new CelebornConf(); + CelebornConf conf3 = new CelebornConf(); + File tmpDir1 = File.createTempFile("celeborn-ratis" + 1, "for-test-only"); + tmpDir1.delete(); + tmpDir1.mkdirs(); + conf1.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR().key(), tmpDir1.getAbsolutePath()); + conf1.set(CelebornConf.HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD().key(), "100"); + conf1.set(CelebornConf.HA_MASTER_RATIS_LOG_PURGE_GAP().key(), "200"); + conf1.set(CelebornConf.HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX().key(), "13490"); + + File tmpDir2 = File.createTempFile("celeborn-ratis" + 2, "for-test-only"); + tmpDir2.delete(); + tmpDir2.mkdirs(); + conf2.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR().key(), tmpDir2.getAbsolutePath()); + conf2.set(CelebornConf.HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD().key(), "100"); + conf2.set(CelebornConf.HA_MASTER_RATIS_LOG_PURGE_GAP().key(), "200"); + conf2.set(CelebornConf.HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX().key(), "13490"); + + File tmpDir3 = File.createTempFile("celeborn-ratis" + 3, "for-test-only"); + tmpDir3.delete(); + tmpDir3.mkdirs(); + conf3.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR().key(), tmpDir3.getAbsolutePath()); + conf3.set(CelebornConf.HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD().key(), "100"); + conf3.set(CelebornConf.HA_MASTER_RATIS_LOG_PURGE_GAP().key(), "200"); + conf3.set(CelebornConf.HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX().key(), "13490"); + + RpcEnv mockRpcEnv = Mockito.mock(RpcEnv.class); + HAMasterMetaManager masterStatusSystem1 = new HAMasterMetaManager(mockRpcEnv, conf1); + HAMasterMetaManager masterStatusSystem2 = new HAMasterMetaManager(mockRpcEnv, conf2); + HAMasterMetaManager masterStatusSystem3 = new HAMasterMetaManager(mockRpcEnv, conf3); + MetaHandler handler1 = new MetaHandler(masterStatusSystem1); + MetaHandler handler2 = new MetaHandler(masterStatusSystem2); + MetaHandler handler3 = new MetaHandler(masterStatusSystem3); + MasterNode masterNode1 = + new MasterNode.Builder() + .setHost(Utils.localHostName(conf1)) + .setRatisPort(9200) + .setRpcPort(9201) + .setNodeId(UUID.randomUUID().toString()) + .build(); + MasterNode masterNode2 = + new MasterNode.Builder() + .setHost(Utils.localHostName(conf2)) + .setRatisPort(9204) + .setRpcPort(9205) + .setNodeId(UUID.randomUUID().toString()) + .build(); + MasterNode masterNode3 = + new MasterNode.Builder() + .setHost(Utils.localHostName(conf3)) + .setRatisPort(9207) + .setRpcPort(9208) + .setNodeId(UUID.randomUUID().toString()) + .build(); + HARaftServer raftServer1 = + HARaftServer.newMasterRatisServer( + handler1, conf1, masterNode1, Arrays.asList(masterNode2, masterNode3)); + HARaftServer raftServer2 = + HARaftServer.newMasterRatisServer( + handler2, conf2, masterNode2, Arrays.asList(masterNode1, masterNode3)); + HARaftServer raftServer3 = + HARaftServer.newMasterRatisServer( + handler3, conf3, masterNode3, Arrays.asList(masterNode1, masterNode2)); + masterStatusSystem1.setRatisServer(raftServer1); + masterStatusSystem2.setRatisServer(raftServer2); + masterStatusSystem3.setRatisServer(raftServer3); + raftServer1.start(); + raftServer2.start(); + raftServer3.start(); + Thread.sleep(15 * 1000); + + HAMasterMetaManager leaderStatusSystem; + HAMasterMetaManager followerStatusSystem1; + HAMasterMetaManager followerStatusSystem2; + if (raftServer1.isLeader()) { + leaderStatusSystem = masterStatusSystem1; + followerStatusSystem1 = masterStatusSystem2; + followerStatusSystem2 = masterStatusSystem3; + } else if (raftServer2.isLeader()) { + leaderStatusSystem = masterStatusSystem2; + followerStatusSystem1 = masterStatusSystem1; + followerStatusSystem2 = masterStatusSystem3; + } else { + leaderStatusSystem = masterStatusSystem3; + followerStatusSystem1 = masterStatusSystem1; + followerStatusSystem2 = masterStatusSystem2; + } + return Arrays.asList(leaderStatusSystem, followerStatusSystem1, followerStatusSystem2); + } + + public void stopRaftServers(List raftServers) { + for (HAMasterMetaManager metaManager : raftServers) { + metaManager.getRatisServer().stop(); + } + } + + @Test + public void testInstallSnapshot() + throws IOException, InterruptedException, InvocationTargetException, NoSuchMethodException, + IllegalAccessException, ClassNotFoundException { + /** + * We will first write 180 logs, then pause follower2, and then write 20 logs. At this time, the + * leader will trigger a snapshot, at this time purge condition is reached, purge closed + * segments. When follower2 resumes, install snapshot from leader will be triggered. + */ + List raftServers = startRaftServers(); + HAMasterMetaManager leaderStatusSystem = raftServers.get(0); + HAMasterMetaManager followerStatusSystem1 = raftServers.get(1); + HAMasterMetaManager followerStatusSystem2 = raftServers.get(2); + Map disks1 = new HashMap<>(); + Map userResourceConsumption1 = new HashMap<>(); + + // per register produces 2 logs + for (int i = 0; i < 50; i++) { + leaderStatusSystem.handleRegisterWorker( + "host1", + 1000 + i, + 2000 + i, + 3000 + i, + 4000 + i, + disks1, + userResourceConsumption1, + getNewReqeustId()); + } + // wait for taking snapshot + Thread.sleep(2000); + Assert.assertEquals( + 100, + leaderStatusSystem.getRatisServer().getMasterStateMachine().getLatestSnapshot().getIndex()); + + Assert.assertEquals( + 100, + followerStatusSystem1 + .getRatisServer() + .getMasterStateMachine() + .getLatestSnapshot() + .getIndex()); + + Assert.assertEquals( + 100, + followerStatusSystem2 + .getRatisServer() + .getMasterStateMachine() + .getLatestSnapshot() + .getIndex()); + + for (int i = 0; i < 40; i++) { + leaderStatusSystem.handleRegisterWorker( + "host1", + 1100 + i, + 2100 + i, + 3100 + i, + 4100 + i, + disks1, + userResourceConsumption1, + getNewReqeustId()); + } + // wait for sync + Thread.sleep(2000); + + Assert.assertEquals( + 180, + leaderStatusSystem + .getRatisServer() + .getMasterStateMachine() + .getLastAppliedTermIndex() + .getIndex()); + + Assert.assertEquals( + 180, + followerStatusSystem1 + .getRatisServer() + .getMasterStateMachine() + .getLastAppliedTermIndex() + .getIndex()); + + Assert.assertEquals( + 180, + followerStatusSystem2 + .getRatisServer() + .getMasterStateMachine() + .getLastAppliedTermIndex() + .getIndex()); + + pauseRaftServer(followerStatusSystem2.getRatisServer().getServer()); + Thread.sleep(200); + for (int i = 0; i < 10; i++) { + leaderStatusSystem.handleRegisterWorker( + "host1", + 1180 + i, + 2180 + i, + 3180 + i, + 4180 + i, + disks1, + userResourceConsumption1, + getNewReqeustId()); + } + Thread.sleep(2000); + Assert.assertEquals( + 200, + leaderStatusSystem.getRatisServer().getMasterStateMachine().getLatestSnapshot().getIndex()); + Assert.assertEquals( + 200, + leaderStatusSystem + .getRatisServer() + .getMasterStateMachine() + .getLastAppliedTermIndex() + .getIndex()); + + Assert.assertEquals( + 200, + followerStatusSystem1 + .getRatisServer() + .getMasterStateMachine() + .getLatestSnapshot() + .getIndex()); + Assert.assertEquals( + 200, + followerStatusSystem1 + .getRatisServer() + .getMasterStateMachine() + .getLastAppliedTermIndex() + .getIndex()); + + Assert.assertEquals( + 100, + followerStatusSystem2 + .getRatisServer() + .getMasterStateMachine() + .getLatestSnapshot() + .getIndex()); + Assert.assertEquals( + 180, + followerStatusSystem2 + .getRatisServer() + .getMasterStateMachine() + .getLastAppliedTermIndex() + .getIndex()); + + resumeRaftServer(followerStatusSystem2.getRatisServer().getServer()); + // install snapshot from leader + Thread.sleep(5000); + Assert.assertEquals( + 200, + followerStatusSystem2 + .getRatisServer() + .getMasterStateMachine() + .getLatestSnapshot() + .getIndex()); + Assert.assertEquals( + 200, + followerStatusSystem2 + .getRatisServer() + .getMasterStateMachine() + .getLastAppliedTermIndex() + .getIndex()); + stopRaftServers(raftServers); + } }