From adbef7b4412db9eecbe1a039a1cc7d4a8ea9f4d6 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Thu, 11 Jul 2024 16:29:58 +0800 Subject: [PATCH] [CELEBORN-1499] Bump Ratis version from 3.0.1 to 3.1.0 ### What changes were proposed in this pull request? Bump Ratis version from 3.0.1 to 3.1.0. Meanwhile, remove `CelebornStateMachineStorage` with the release of https://github.com/apache/ratis/pull/1111. ### Why are the changes needed? Bump Ratis version from 3.0.1 to 3.1.0. Ratis has released v3.1.0, of which release note refers to [3.1.0](https://ratis.apache.org/post/3.1.0.html). The 3.1.0 version is a minor release with multiple improvements and bugfixes including [[RATIS-2111] Reinitialize should load the latest snapshot](https://issues.apache.org/jira/browse/RATIS-2111). See the [changes between 3.0.1 and 3.1.0](https://github.com/apache/ratis/compare/ratis-3.0.1...ratis-3.1.0) releases. Follow up #2547. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `MasterStateMachineSuiteJ#testInstallSnapshot` Closes #2610 from SteNicholas/CELEBORN-1499. Authored-by: SteNicholas Signed-off-by: Shuang --- LICENSE | 3 - dev/deps/dependencies-server | 18 +- .../ha/CelebornStateMachineStorage.java | 212 ------------------ .../master/clustermeta/ha/HAHelper.java | 4 +- .../master/clustermeta/ha/StateMachine.java | 18 +- pom.xml | 2 +- project/CelebornBuild.scala | 2 +- 7 files changed, 30 insertions(+), 229 deletions(-) delete mode 100644 master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java diff --git a/LICENSE b/LICENSE index 44365ce1851..3c8858ec9f5 100644 --- a/LICENSE +++ b/LICENSE @@ -256,6 +256,3 @@ Remote Shuffle Service for Flink ./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/DataBuffer.java ./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBasedDataBuffer.java ./client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java - -Apache Ratis -./master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server index 33b965865e0..2cdab3abf82 100644 --- a/dev/deps/dependencies-server +++ b/dev/deps/dependencies-server @@ -118,15 +118,15 @@ netty-transport/4.1.109.Final//netty-transport-4.1.109.Final.jar osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar paranamer/2.8//paranamer-2.8.jar protobuf-java/3.21.7//protobuf-java-3.21.7.jar -ratis-client/3.0.1//ratis-client-3.0.1.jar -ratis-common/3.0.1//ratis-common-3.0.1.jar -ratis-grpc/3.0.1//ratis-grpc-3.0.1.jar -ratis-metrics-default/3.0.1/ratis-metrics-default-3.0.1.jar -ratis-netty/3.0.1//ratis-netty-3.0.1.jar -ratis-proto/3.0.1//ratis-proto-3.0.1.jar -ratis-server-api/3.0.1//ratis-server-api-3.0.1.jar -ratis-server/3.0.1//ratis-server-3.0.1.jar -ratis-shell/3.0.1//ratis-shell-3.0.1.jar +ratis-client/3.1.0//ratis-client-3.1.0.jar +ratis-common/3.1.0//ratis-common-3.1.0.jar +ratis-grpc/3.1.0//ratis-grpc-3.1.0.jar +ratis-metrics-default/3.1.0/ratis-metrics-default-3.1.0.jar +ratis-netty/3.1.0//ratis-netty-3.1.0.jar +ratis-proto/3.1.0//ratis-proto-3.1.0.jar +ratis-server-api/3.1.0//ratis-server-api-3.1.0.jar +ratis-server/3.1.0//ratis-server-3.1.0.jar +ratis-shell/3.1.0//ratis-shell-3.1.0.jar ratis-thirdparty-misc/1.0.5//ratis-thirdparty-misc-1.0.5.jar reflections/0.10.2//reflections-0.10.2.jar rocksdbjni/8.11.3//rocksdbjni-8.11.3.jar diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java deleted file mode 100644 index 5035a838089..00000000000 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/CelebornStateMachineStorage.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.celeborn.service.deploy.master.clustermeta.ha; - -import static org.apache.ratis.util.MD5FileUtil.MD5_SUFFIX; - -import java.io.File; -import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.ratis.io.MD5Hash; -import org.apache.ratis.server.storage.FileInfo; -import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.statemachine.SnapshotRetentionPolicy; -import org.apache.ratis.statemachine.StateMachineStorage; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; -import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; -import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.MD5FileUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Copied from Apache Ratis {@link SimpleStateMachineStorage}, We need to refresh latest snapshot - * after installing snapshot from leader which makes StateMachine load latest snapshot correctly. - */ -public class CelebornStateMachineStorage implements StateMachineStorage { - - private static final Logger LOG = LoggerFactory.getLogger(CelebornStateMachineStorage.class); - - static final String SNAPSHOT_FILE_PREFIX = "snapshot"; - /** snapshot.term_index */ - public static final Pattern SNAPSHOT_REGEX = - Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)"); - - public static final Pattern SNAPSHOT_MD5_REGEX = - Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)" + MD5_SUFFIX); - private static final DirectoryStream.Filter SNAPSHOT_MD5_FILTER = - entry -> - Optional.ofNullable(entry.getFileName()) - .map(Path::toString) - .map(SNAPSHOT_MD5_REGEX::matcher) - .filter(Matcher::matches) - .isPresent(); - - private volatile File stateMachineDir = null; - private final AtomicReference latestSnapshot = new AtomicReference<>(); - - File tmpDir = null; - - @Override - public void init(RaftStorage storage) throws IOException { - this.stateMachineDir = storage.getStorageDir().getStateMachineDir(); - loadLatestSnapshot(); - tmpDir = storage.getStorageDir().getTmpDir(); - } - - @Override - public void format() throws IOException { - // TODO - } - - static List getSingleFileSnapshotInfos(Path dir) throws IOException { - final List infos = new ArrayList<>(); - try (DirectoryStream stream = Files.newDirectoryStream(dir)) { - for (Path path : stream) { - final Path filename = path.getFileName(); - if (filename != null) { - final Matcher matcher = SNAPSHOT_REGEX.matcher(filename.toString()); - if (matcher.matches()) { - final long term = Long.parseLong(matcher.group(1)); - final long index = Long.parseLong(matcher.group(2)); - final FileInfo fileInfo = new FileInfo(path, null); // No FileDigest here. - infos.add(new SingleFileSnapshotInfo(fileInfo, term, index)); - } - } - } - } - return infos; - } - - @Override - public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) - throws IOException { - if (stateMachineDir == null) { - return; - } - - final int numSnapshotsRetained = - Optional.ofNullable(snapshotRetentionPolicy) - .map(SnapshotRetentionPolicy::getNumSnapshotsRetained) - .orElse(SnapshotRetentionPolicy.DEFAULT_ALL_SNAPSHOTS_RETAINED); - if (numSnapshotsRetained <= 0) { - return; - } - - final List allSnapshotFiles = - getSingleFileSnapshotInfos(stateMachineDir.toPath()); - - if (allSnapshotFiles.size() > snapshotRetentionPolicy.getNumSnapshotsRetained()) { - allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed()); - allSnapshotFiles.subList(numSnapshotsRetained, allSnapshotFiles.size()).stream() - .map(SingleFileSnapshotInfo::getFile) - .map(FileInfo::getPath) - .forEach( - snapshotPath -> { - LOG.info("Deleting old snapshot at {}", snapshotPath.toAbsolutePath()); - FileUtils.deletePathQuietly(snapshotPath); - }); - // clean up the md5 files if the corresponding snapshot file does not exist - try (DirectoryStream stream = - Files.newDirectoryStream(stateMachineDir.toPath(), SNAPSHOT_MD5_FILTER)) { - for (Path md5path : stream) { - Path md5FileNamePath = md5path.getFileName(); - if (md5FileNamePath == null) { - continue; - } - final String md5FileName = md5FileNamePath.toString(); - final File snapshotFile = - new File( - stateMachineDir, - md5FileName.substring(0, md5FileName.length() - MD5_SUFFIX.length())); - if (!snapshotFile.exists()) { - FileUtils.deletePathQuietly(md5path); - } - } - } - } - } - - public File getSnapshotFile(long term, long endIndex) { - final File dir = Objects.requireNonNull(stateMachineDir, "stateMachineDir == null"); - return new File(dir, getSnapshotFileName(term, endIndex)); - } - - static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws IOException { - final Iterator i = getSingleFileSnapshotInfos(dir).iterator(); - if (!i.hasNext()) { - return null; - } - - SingleFileSnapshotInfo latest = i.next(); - for (; i.hasNext(); ) { - final SingleFileSnapshotInfo info = i.next(); - if (info.getIndex() > latest.getIndex()) { - latest = info; - } - } - - // read md5 - final Path path = latest.getFile().getPath(); - final MD5Hash md5 = MD5FileUtil.readStoredMd5ForFile(path.toFile()); - final FileInfo info = new FileInfo(path, md5); - return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex()); - } - - public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo info) { - return latestSnapshot.updateAndGet( - previous -> previous == null || info.getIndex() > previous.getIndex() ? info : previous); - } - - public static String getSnapshotFileName(long term, long endIndex) { - return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex; - } - - @Override - public SingleFileSnapshotInfo getLatestSnapshot() { - return latestSnapshot.get(); - } - - @Override - public File getTmpDir() { - return tmpDir; - } - - public void loadLatestSnapshot() { - final File dir = stateMachineDir; - if (dir == null) { - return; - } - try { - updateLatestSnapshot(findLatestSnapshot(dir.toPath())); - } catch (IOException ignored) { - } - } -} diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java index 8307d60577f..71bcaf064c5 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAHelper.java @@ -23,6 +23,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.protocol.Message; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.celeborn.common.client.MasterNotLeaderException; @@ -123,8 +124,7 @@ public static Message convertResponseToMessage(ResourceProtos.ResourceResponse r * @return the temporary snapshot file * @throws IOException if error occurred while creating the snapshot file */ - public static File createTempSnapshotFile(CelebornStateMachineStorage storage) - throws IOException { + public static File createTempSnapshotFile(SimpleStateMachineStorage storage) throws IOException { File tempDir = storage.getTmpDir(); if (!tempDir.isDirectory() && !tempDir.mkdir()) { throw new IOException( diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java index ff43e59c211..40d23c59f41 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java @@ -45,6 +45,7 @@ import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.ExitUtils; @@ -60,7 +61,22 @@ public class StateMachine extends BaseStateMachine { private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); - private final CelebornStateMachineStorage storage = new CelebornStateMachineStorage(); + private final SimpleStateMachineStorage storage = + new SimpleStateMachineStorage() { + + File tmpDir = null; + + @Override + public void init(RaftStorage storage) throws IOException { + super.init(storage); + tmpDir = storage.getStorageDir().getTmpDir(); + } + + @Override + public File getTmpDir() { + return tmpDir; + } + }; private final HARaftServer masterRatisServer; private RaftGroupId raftGroupId; diff --git a/pom.xml b/pom.xml index e93b7d442fa..8ada82b3f65 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ 4.1.109.Final 1.77 3.21.7 - 3.0.1 + 3.1.0 3.2.16 1.7.36 1.0.6 diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index 00809d8fa51..15d45f09cba 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -58,7 +58,7 @@ object Dependencies { val metricsVersion = "4.2.25" val mockitoVersion = "4.11.0" val nettyVersion = "4.1.109.Final" - val ratisVersion = "3.0.1" + val ratisVersion = "3.1.0" val roaringBitmapVersion = "1.0.6" val rocksdbJniVersion = "8.11.3" val jacksonVersion = "2.15.3"