diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java
deleted file mode 100644
index 0f441fe01a368..0000000000000
--- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.gateway.remote;
-
-import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
-import org.opensearch.common.blobstore.BlobPath;
-import org.opensearch.common.settings.Settings;
-import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
-import org.opensearch.repositories.RepositoriesService;
-import org.opensearch.repositories.blobstore.BlobStoreRepository;
-import org.opensearch.test.OpenSearchIntegTestCase;
-import org.junit.Before;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
-import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
-
-@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
-public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {
-
-    private static final String INDEX_NAME = "test-index";
-
-    @Before
-    public void setup() {
-        asyncUploadMockFsRepo = false;
-    }
-
-    @Override
-    protected Settings nodeSettings(int nodeOrdinal) {
-        return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
-    }
-
-    private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
-        prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
-        Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
-        assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards);
-        ensureGreen(INDEX_NAME);
-        return indexStats;
-    }
-
-    public void testRemoteCleanupTaskUpdated() {
-        int shardCount = randomIntBetween(1, 2);
-        int replicaCount = 1;
-        int dataNodeCount = shardCount * (replicaCount + 1);
-        int clusterManagerNodeCount = 1;
-
-        initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
-        RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
-            RemoteClusterStateCleanupManager.class
-        );
-
-        assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval());
-        assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
-
-        // now disable
-        client().admin()
-            .cluster()
-            .prepareUpdateSettings()
-            .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
-            .get();
-
-        assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis());
-        assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
-
-        // now set Clean up interval to 1 min
-        client().admin()
-            .cluster()
-            .prepareUpdateSettings()
-            .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
-            .get();
-        assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
-    }
-
-    public void testRemoteCleanupDeleteStale() throws Exception {
-        int shardCount = randomIntBetween(1, 2);
-        int replicaCount = 1;
-        int dataNodeCount = shardCount * (replicaCount + 1);
-        int clusterManagerNodeCount = 1;
-
-        initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
-
-        // set cleanup interval to 100 ms to make the test faster
-        ClusterUpdateSettingsResponse response = client().admin()
-            .cluster()
-            .prepareUpdateSettings()
-            .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
-            .get();
-
-        assertTrue(response.isAcknowledged());
-
-        // update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
-        // to repository, if manifest files are less than that it means clean up has run
-        updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);
-
-        RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
-        BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
-        BlobPath baseMetadataPath = repository.basePath()
-            .add(
-                Base64.getUrlEncoder()
-                    .withoutPadding()
-                    .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
-            )
-            .add("cluster-state")
-            .add(getClusterState().metadata().clusterUUID());
-        BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
-
-        assertBusy(() -> {
-            int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
-            logger.info("number of current manifest file: {}", manifestFiles);
-            // we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
-            // other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
-            // Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
-            assertTrue(
-                "Current number of manifest files: " + manifestFiles,
-                manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
-            );
-        }, 500, TimeUnit.MILLISECONDS);
-    }
-
-    private void updateClusterStateNTimes(int n) {
-        int newReplicaCount = randomIntBetween(0, 3);
-        for (int i = n; i > 0; i--) {
-            ClusterUpdateSettingsResponse response = client().admin()
-                .cluster()
-                .prepareUpdateSettings()
-                .setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS))
-                .get();
-            assertTrue(response.isAcknowledged());
-        }
-    }
-}
diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
index ab2f0f0080566..42120aa32eb47 100644
--- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
@@ -10,6 +10,7 @@
 
 import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
 import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.common.blobstore.BlobPath;
 import org.opensearch.common.settings.Settings;
 import org.opensearch.discovery.DiscoveryStats;
@@ -26,6 +27,7 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
@@ -49,6 +51,16 @@ protected Settings nodeSettings(int nodeOrdinal) {
         return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
     }
 
+    private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
+        internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
+        internalCluster().startDataOnlyNodes(numDataOnlyNodes);
+        for (String index : indices.split(",")) {
+            createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
+            ensureYellowAndNoInitializingShards(index);
+            ensureGreen(index);
+        }
+    }
+
     private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
         prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
         Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
@@ -57,6 +69,49 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
         return indexStats;
     }
 
+    public void testFullClusterRestoreStaleDelete() throws Exception {
+        int shardCount = randomIntBetween(1, 2);
+        int replicaCount = 1;
+        int dataNodeCount = shardCount * (replicaCount + 1);
+        int clusterManagerNodeCount = 1;
+
+        initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
+        setReplicaCount(0);
+        setReplicaCount(2);
+        setReplicaCount(0);
+        setReplicaCount(1);
+        setReplicaCount(0);
+        setReplicaCount(1);
+        setReplicaCount(0);
+        setReplicaCount(2);
+        setReplicaCount(0);
+
+        RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
+            RemoteClusterStateService.class
+        );
+
+        RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
+
+        BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
+        BlobPath baseMetadataPath = repository.basePath()
+            .add(
+                Base64.getUrlEncoder()
+                    .withoutPadding()
+                    .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
+            )
+            .add("cluster-state")
+            .add(getClusterState().metadata().clusterUUID());
+
+        assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());
+
+        Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
+            cluster().getClusterName(),
+            getClusterState().metadata().clusterUUID()
+        ).getMetadata().getIndices();
+        assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
+        assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
+    }
+
     public void testRemoteStateStats() {
         int shardCount = randomIntBetween(1, 2);
         int replicaCount = 1;
@@ -186,4 +241,12 @@ private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) {
         assertNotNull(nodesStatsResponse.getNodes().get(0));
         assertNotNull(nodesStatsResponse.getNodes().get(0).getDiscoveryStats());
     }
+
+    private void setReplicaCount(int replicaCount) {
+        client().admin()
+            .indices()
+            .prepareUpdateSettings(INDEX_NAME)
+            .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
+            .get();
+    }
 }
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
index 64efcee6ef1b5..740aee69f7d80 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
@@ -350,14 +350,4 @@ protected void restore(boolean restoreAllShards, String... indices) {
                 PlainActionFuture.newFuture()
             );
     }
-
-    protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
-        internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
-        internalCluster().startDataOnlyNodes(numDataOnlyNodes);
-        for (String index : indices.split(",")) {
-            createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
-            ensureYellowAndNoInitializingShards(index);
-            ensureGreen(index);
-        }
-    }
 }
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index 297fc98764d07..7814518af471b 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -104,7 +104,6 @@
 import org.opensearch.gateway.GatewayService;
 import org.opensearch.gateway.PersistedClusterStateService;
 import org.opensearch.gateway.ShardsBatchGatewayAllocator;
-import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
 import org.opensearch.gateway.remote.RemoteClusterStateService;
 import org.opensearch.http.HttpTransportSettings;
 import org.opensearch.index.IndexModule;
@@ -712,7 +711,6 @@ public void apply(Settings value, Settings current, Settings previous) {
                 SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL,
 
                 // Remote cluster state settings
-                RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
                 RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
                 RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
                 RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java
deleted file mode 100644
index 8a106a25e5630..0000000000000
--- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.gateway.remote;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.logging.log4j.util.Strings;
-import org.opensearch.cluster.ClusterState;
-import org.opensearch.cluster.service.ClusterApplierService;
-import org.opensearch.cluster.service.ClusterService;
-import org.opensearch.common.blobstore.BlobMetadata;
-import org.opensearch.common.blobstore.BlobPath;
-import org.opensearch.common.settings.ClusterSettings;
-import org.opensearch.common.settings.Setting;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.common.util.concurrent.AbstractAsyncTask;
-import org.opensearch.core.action.ActionListener;
-import org.opensearch.index.translog.transfer.BlobStoreTransferService;
-import org.opensearch.threadpool.ThreadPool;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
-
-/**
- * A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
- *
- * @opensearch.internal
- */
-public class RemoteClusterStateCleanupManager implements Closeable {
-
-    public static final int RETAINED_MANIFESTS = 10;
-    public static final int SKIP_CLEANUP_STATE_CHANGES = 10;
-    public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5);
-    public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM = TimeValue.MINUS_ONE;
-
-    /**
-     * Setting to specify the interval to do run stale file cleanup job
-     * Min value -1 indicates that the stale file cleanup job should be disabled
-     */
-    public static final Setting<TimeValue> REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING = Setting.timeSetting(
-        "cluster.remote_store.state.cleanup_interval",
-        CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT,
-        CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM,
-        Setting.Property.NodeScope,
-        Setting.Property.Dynamic
-    );
-    private static final Logger logger = LogManager.getLogger(RemoteClusterStateCleanupManager.class);
-    private final RemoteClusterStateService remoteClusterStateService;
-    private final RemotePersistenceStats remoteStateStats;
-    private BlobStoreTransferService blobStoreTransferService;
-    private TimeValue staleFileCleanupInterval;
-    private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
-    private AsyncStaleFileDeletion staleFileDeletionTask;
-    private long lastCleanupAttemptStateVersion;
-    private final ThreadPool threadpool;
-    private final ClusterApplierService clusterApplierService;
-
-    public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
-        this.remoteClusterStateService = remoteClusterStateService;
-        this.remoteStateStats = remoteClusterStateService.getStats();
-        ClusterSettings clusterSettings = clusterService.getClusterSettings();
-        this.clusterApplierService = clusterService.getClusterApplierService();
-        this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);
-        this.threadpool = remoteClusterStateService.getThreadpool();
-        // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
-        this.lastCleanupAttemptStateVersion = 0;
-        clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
-    }
-
-    void start() {
-        staleFileDeletionTask = new AsyncStaleFileDeletion(this);
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (staleFileDeletionTask != null) {
-            staleFileDeletionTask.close();
-        }
-    }
-
-    private BlobStoreTransferService getBlobStoreTransferService() {
-        if (blobStoreTransferService == null) {
-            blobStoreTransferService = new BlobStoreTransferService(remoteClusterStateService.getBlobStore(), threadpool);
-        }
-        return blobStoreTransferService;
-    }
-
-    private void updateCleanupInterval(TimeValue updatedInterval) {
-        this.staleFileCleanupInterval = updatedInterval;
-        logger.info("updated remote state cleanup interval to {}", updatedInterval);
-        // After updating the interval, we need to close the current task and create a new one which will run with updated interval
-        if (staleFileDeletionTask != null && !staleFileDeletionTask.getInterval().equals(updatedInterval)) {
-            staleFileDeletionTask.setInterval(updatedInterval);
-        }
-    }
-
-    // visible for testing
-    void cleanUpStaleFiles() {
-        ClusterState currentAppliedState = clusterApplierService.state();
-        if (currentAppliedState.nodes().isLocalNodeElectedClusterManager()) {
-            long cleanUpAttemptStateVersion = currentAppliedState.version();
-            assert Strings.isNotEmpty(currentAppliedState.getClusterName().value()) : "cluster name is not set";
-            assert Strings.isNotEmpty(currentAppliedState.metadata().clusterUUID()) : "cluster uuid is not set";
-            if (cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion > SKIP_CLEANUP_STATE_CHANGES) {
-                logger.info(
-                    "Cleaning up stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates",
-                    currentAppliedState.getClusterName().value(),
-                    currentAppliedState.metadata().clusterUUID(),
-                    cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion
-                );
-                this.deleteStaleClusterMetadata(
-                    currentAppliedState.getClusterName().value(),
-                    currentAppliedState.metadata().clusterUUID(),
-                    RETAINED_MANIFESTS
-                );
-                lastCleanupAttemptStateVersion = cleanUpAttemptStateVersion;
-            } else {
-                logger.debug(
-                    "Skipping cleanup of stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates, which is less than threshold {}",
-                    currentAppliedState.getClusterName().value(),
-                    currentAppliedState.metadata().clusterUUID(),
-                    cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion,
-                    SKIP_CLEANUP_STATE_CHANGES
-                );
-            }
-        } else {
-            logger.debug("Skipping cleanup task as local node is not elected Cluster Manager");
-        }
-    }
-
-    private void addStaleGlobalMetadataPath(String fileName, Set<String> filesToKeep, Set<String> staleGlobalMetadataPaths) {
-        if (!filesToKeep.contains(fileName)) {
-            String[] splitPath = fileName.split("/");
-            staleGlobalMetadataPaths.add(
-                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
-                    splitPath[splitPath.length - 1]
-                )
-            );
-        }
-    }
-
-    // visible for testing
-    void deleteClusterMetadata(
-        String clusterName,
-        String clusterUUID,
-        List<BlobMetadata> activeManifestBlobMetadata,
-        List<BlobMetadata> staleManifestBlobMetadata
-    ) {
-        try {
-            Set<String> filesToKeep = new HashSet<>();
-            Set<String> staleManifestPaths = new HashSet<>();
-            Set<String> staleIndexMetadataPaths = new HashSet<>();
-            Set<String> staleGlobalMetadataPaths = new HashSet<>();
-            activeManifestBlobMetadata.forEach(blobMetadata -> {
-                ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
-                    clusterName,
-                    clusterUUID,
-                    blobMetadata.name()
-                );
-                clusterMetadataManifest.getIndices()
-                    .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
-                if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
-                    filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
-                } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
-                    filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
-                    filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
-                    filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
-                    clusterMetadataManifest.getCustomMetadataMap()
-                        .values()
-                        .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
-                }
-            });
-            staleManifestBlobMetadata.forEach(blobMetadata -> {
-                ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
-                    clusterName,
-                    clusterUUID,
-                    blobMetadata.name()
-                );
-                staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
-                if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
-                    addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
-                } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
-                    addStaleGlobalMetadataPath(
-                        clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
-                        filesToKeep,
-                        staleGlobalMetadataPaths
-                    );
-                    addStaleGlobalMetadataPath(
-                        clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(),
-                        filesToKeep,
-                        staleGlobalMetadataPaths
-                    );
-                    addStaleGlobalMetadataPath(
-                        clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(),
-                        filesToKeep,
-                        staleGlobalMetadataPaths
-                    );
-                    clusterMetadataManifest.getCustomMetadataMap()
-                        .values()
-                        .forEach(
-                            attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
-                        );
-                }
-
-                clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
-                    if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
-                        staleIndexMetadataPaths.add(
-                            new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
-                                + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
-                        );
-                    }
-                });
-            });
-
-            if (staleManifestPaths.isEmpty()) {
-                logger.debug("No stale Remote Cluster Metadata files found");
-                return;
-            }
-
-            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
-            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
-            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
-        } catch (IllegalStateException e) {
-            logger.error("Error while fetching Remote Cluster Metadata manifests", e);
-        } catch (IOException e) {
-            logger.error("Error while deleting stale Remote Cluster Metadata files", e);
-            remoteStateStats.cleanUpAttemptFailed();
-        } catch (Exception e) {
-            logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e);
-            remoteStateStats.cleanUpAttemptFailed();
-        }
-    }
-
-    /**
-     * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests
-     *
-     * @param clusterName name of the cluster
-     * @param clusterUUID uuid of cluster state to refer to in remote
-     * @param manifestsToRetain no of latest manifest files to keep in remote
-     */
-    // package private for testing
-    void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
-        if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
-            logger.info("Delete stale cluster metadata task is already in progress.");
-            return;
-        }
-        try {
-            getBlobStoreTransferService().listAllInSortedOrderAsync(
-                ThreadPool.Names.REMOTE_PURGE,
-                remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
-                MANIFEST_FILE_PREFIX,
-                Integer.MAX_VALUE,
-                new ActionListener<>() {
-                    @Override
-                    public void onResponse(List<BlobMetadata> blobMetadata) {
-                        if (blobMetadata.size() > manifestsToRetain) {
-                            deleteClusterMetadata(
-                                clusterName,
-                                clusterUUID,
-                                blobMetadata.subList(0, manifestsToRetain),
-                                blobMetadata.subList(manifestsToRetain, blobMetadata.size())
-                            );
-                        }
-                        deleteStaleMetadataRunning.set(false);
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        logger.error(
-                            new ParameterizedMessage(
-                                "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
-                                clusterUUID
-                            )
-                        );
-                        deleteStaleMetadataRunning.set(false);
-                    }
-                }
-            );
-        } catch (Exception e) {
-            deleteStaleMetadataRunning.set(false);
-            throw e;
-        }
-    }
-
-    /**
-     * Purges all remote cluster state against provided cluster UUIDs
-     *
-     * @param clusterName name of the cluster
-     * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
-     */
-    void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUIDs) {
-        clusterUUIDs.forEach(
-            clusterUUID -> getBlobStoreTransferService().deleteAsync(
-                ThreadPool.Names.REMOTE_PURGE,
-                remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
-                new ActionListener<>() {
-                    @Override
-                    public void onResponse(Void unused) {
-                        logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID);
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        logger.error(
-                            new ParameterizedMessage(
-                                "Exception occurred while deleting all remote cluster metadata for cluster UUID {}",
-                                clusterUUID
-                            ),
-                            e
-                        );
-                        remoteStateStats.cleanUpAttemptFailed();
-                    }
-                }
-            )
-        );
-    }
-
-    // package private for testing
-    void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
-        logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
-        getBlobStoreTransferService().deleteBlobs(
-            remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
-            stalePaths
-        );
-    }
-
-    /**
-     * Purges all remote cluster state against provided cluster UUIDs
-     * @param clusterState current state of the cluster
-     * @param committedManifest last committed ClusterMetadataManifest
-     */
-    public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
-        threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
-            String clusterName = clusterState.getClusterName().value();
-            logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
-            Set<String> allClustersUUIDsInRemote;
-            try {
-                allClustersUUIDsInRemote = new HashSet<>(
-                    remoteClusterStateService.getAllClusterUUIDs(clusterState.getClusterName().value())
-                );
-            } catch (IOException e) {
-                logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
-                return;
-            }
-            // Retain last 2 cluster uuids data
-            allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
-            allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
-            deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
-        });
-    }
-
-    public TimeValue getStaleFileCleanupInterval() {
-        return this.staleFileCleanupInterval;
-    }
-
-    AsyncStaleFileDeletion getStaleFileDeletionTask() { // for testing
-        return this.staleFileDeletionTask;
-    }
-
-    RemotePersistenceStats getStats() {
-        return this.remoteStateStats;
-    }
-
-    static final class AsyncStaleFileDeletion extends AbstractAsyncTask {
-        private final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
-
-        AsyncStaleFileDeletion(RemoteClusterStateCleanupManager remoteClusterStateCleanupManager) {
-            super(
-                logger,
-                remoteClusterStateCleanupManager.threadpool,
-                remoteClusterStateCleanupManager.getStaleFileCleanupInterval(),
-                true
-            );
-            this.remoteClusterStateCleanupManager = remoteClusterStateCleanupManager;
-            rescheduleIfNecessary();
-        }
-
-        @Override
-        protected boolean mustReschedule() {
-            return true;
-        }
-
-        @Override
-        protected void runInternal() {
-            remoteClusterStateCleanupManager.cleanUpStaleFiles();
-        }
-    }
-}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
index 01ffd8f1cca46..0f862d1b68820 100644
--- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
+++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
@@ -18,13 +18,11 @@
 import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.cluster.metadata.Metadata;
 import org.opensearch.cluster.metadata.TemplatesMetadata;
-import org.opensearch.cluster.service.ClusterService;
 import org.opensearch.common.CheckedRunnable;
 import org.opensearch.common.Nullable;
 import org.opensearch.common.blobstore.BlobContainer;
 import org.opensearch.common.blobstore.BlobMetadata;
 import org.opensearch.common.blobstore.BlobPath;
-import org.opensearch.common.blobstore.BlobStore;
 import org.opensearch.common.settings.ClusterSettings;
 import org.opensearch.common.settings.Setting;
 import org.opensearch.common.settings.Setting.Property;
@@ -61,6 +59,7 @@
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
@@ -82,6 +81,8 @@ public class RemoteClusterStateService implements Closeable {
 
     public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";
 
+    public static final int RETAINED_MANIFESTS = 10;
+
     public static final String DELIMITER = "__";
     public static final String CUSTOM_DELIMITER = "--";
 
@@ -206,7 +207,8 @@ public class RemoteClusterStateService implements Closeable {
     private volatile TimeValue indexMetadataUploadTimeout;
     private volatile TimeValue globalMetadataUploadTimeout;
     private volatile TimeValue metadataManifestUploadTimeout;
-    private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
+
+    private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
     private final RemotePersistenceStats remoteStateStats;
     private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]";
     private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
@@ -230,7 +232,7 @@ public RemoteClusterStateService(
         String nodeId,
         Supplier<RepositoriesService> repositoriesService,
         Settings settings,
-        ClusterService clusterService,
+        ClusterSettings clusterSettings,
         LongSupplier relativeTimeNanosSupplier,
         ThreadPool threadPool,
         List<IndexMetadataUploadListener> indexMetadataUploadListeners
@@ -241,7 +243,6 @@ public RemoteClusterStateService(
         this.settings = settings;
         this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
         this.threadpool = threadPool;
-        ClusterSettings clusterSettings = clusterService.getClusterSettings();
         this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
         this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
         this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
@@ -251,10 +252,16 @@ public RemoteClusterStateService(
         clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
         clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
         this.remoteStateStats = new RemotePersistenceStats();
-        this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
         this.indexMetadataUploadListeners = indexMetadataUploadListeners;
     }
 
+    private BlobStoreTransferService getBlobStoreTransferService() {
+        if (blobStoreTransferService == null) {
+            blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);
+        }
+        return blobStoreTransferService;
+    }
+
     /**
      * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be
      * invoked by the elected cluster manager when the remote cluster state is enabled.
@@ -410,6 +417,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
                 : previousManifest.getCustomMetadataMap(),
             false
         );
+        deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
 
         final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
         remoteStateStats.stateSucceeded();
@@ -713,10 +721,6 @@ private CheckedRunnable<IOException> getAsyncMetadataWriteAction(
         );
     }
 
-    public RemoteClusterStateCleanupManager getCleanupManager() {
-        return remoteClusterStateCleanupManager;
-    }
-
     @Nullable
     public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
         throws IOException {
@@ -736,16 +740,12 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat
             previousManifest.getCustomMetadataMap(),
             true
         );
-        if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) {
-            remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest);
-        }
-
+        deleteStaleClusterUUIDs(clusterState, committedManifest);
         return committedManifest;
     }
 
     @Override
     public void close() throws IOException {
-        remoteClusterStateCleanupManager.close();
         if (blobStoreRepository != null) {
             IOUtils.close(blobStoreRepository);
         }
@@ -760,7 +760,6 @@ public void start() {
         final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
         assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
         blobStoreRepository = (BlobStoreRepository) repository;
-        remoteClusterStateCleanupManager.start();
     }
 
     private ClusterMetadataManifest uploadManifest(
@@ -851,14 +850,6 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust
         );
     }
 
-    ThreadPool getThreadpool() {
-        return threadpool;
-    }
-
-    BlobStore getBlobStore() {
-        return blobStoreRepository.blobStore();
-    }
-
     private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
         // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
         return blobStoreRepository.blobStore()
@@ -876,7 +867,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID)
         return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID));
     }
 
-    BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
+    private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
         return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
     }
 
@@ -991,7 +982,7 @@ private static String metadataAttributeFileName(String componentPrefix, Long met
         );
     }
 
-    BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
+    private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
         return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN);
     }
 
@@ -1244,7 +1235,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) {
         }
     }
 
-    Set<String> getAllClusterUUIDs(String clusterName) throws IOException {
+    private Set<String> getAllClusterUUIDs(String clusterName) throws IOException {
         Map<String, BlobContainer> clusterUUIDMetadata = clusterUUIDContainer(clusterName).children();
         if (clusterUUIDMetadata == null) {
             return Collections.emptySet();
@@ -1435,7 +1426,7 @@ private Optional<String> getLatestManifestFileName(String clusterName, String cl
      * @param clusterName name of the cluster
      * @return ClusterMetadataManifest
      */
-    ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename)
+    private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename)
         throws IllegalStateException {
         try {
             return getClusterMetadataManifestBlobStoreFormat(filename).read(
@@ -1495,6 +1486,234 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) {
         }
     }
 
+    /**
+     * Purges all remote cluster state against provided cluster UUIDs
+     *
+     * @param clusterName  name of the cluster
+     * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
+     */
+    void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUIDs) {
+        clusterUUIDs.forEach(clusterUUID -> {
+            getBlobStoreTransferService().deleteAsync(
+                ThreadPool.Names.REMOTE_PURGE,
+                getCusterMetadataBasePath(clusterName, clusterUUID),
+                new ActionListener<>() {
+                    @Override
+                    public void onResponse(Void unused) {
+                        logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        logger.error(
+                            new ParameterizedMessage(
+                                "Exception occurred while deleting all remote cluster metadata for cluster UUID {}",
+                                clusterUUID
+                            ),
+                            e
+                        );
+                        remoteStateStats.cleanUpAttemptFailed();
+                    }
+                }
+            );
+        });
+    }
+
+    /**
+     * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests
+     *
+     * @param clusterName       name of the cluster
+     * @param clusterUUID       uuid of cluster state to refer to in remote
+     * @param manifestsToRetain no of latest manifest files to keep in remote
+     */
+    // package private for testing
+    void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
+        if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
+            logger.info("Delete stale cluster metadata task is already in progress.");
+            return;
+        }
+        try {
+            getBlobStoreTransferService().listAllInSortedOrderAsync(
+                ThreadPool.Names.REMOTE_PURGE,
+                getManifestFolderPath(clusterName, clusterUUID),
+                "manifest",
+                Integer.MAX_VALUE,
+                new ActionListener<>() {
+                    @Override
+                    public void onResponse(List<BlobMetadata> blobMetadata) {
+                        if (blobMetadata.size() > manifestsToRetain) {
+                            deleteClusterMetadata(
+                                clusterName,
+                                clusterUUID,
+                                blobMetadata.subList(0, manifestsToRetain - 1),
+                                blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size())
+                            );
+                        }
+                        deleteStaleMetadataRunning.set(false);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        logger.error(
+                            new ParameterizedMessage(
+                                "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
+                                clusterUUID
+                            )
+                        );
+                        deleteStaleMetadataRunning.set(false);
+                    }
+                }
+            );
+        } catch (Exception e) {
+            deleteStaleMetadataRunning.set(false);
+            throw e;
+        }
+    }
+
+    private void deleteClusterMetadata(
+        String clusterName,
+        String clusterUUID,
+        List<BlobMetadata> activeManifestBlobMetadata,
+        List<BlobMetadata> staleManifestBlobMetadata
+    ) {
+        try {
+            Set<String> filesToKeep = new HashSet<>();
+            Set<String> staleManifestPaths = new HashSet<>();
+            Set<String> staleIndexMetadataPaths = new HashSet<>();
+            Set<String> staleGlobalMetadataPaths = new HashSet<>();
+            activeManifestBlobMetadata.forEach(blobMetadata -> {
+                ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
+                    clusterName,
+                    clusterUUID,
+                    blobMetadata.name()
+                );
+                clusterMetadataManifest.getIndices()
+                    .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
+                if (clusterMetadataManifest.getGlobalMetadataFileName() != null) {
+                    filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
+                } else {
+                    filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
+                    filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
+                    filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
+                    clusterMetadataManifest.getCustomMetadataMap()
+                        .forEach((key, value) -> { filesToKeep.add(value.getUploadedFilename()); });
+                }
+            });
+            staleManifestBlobMetadata.forEach(blobMetadata -> {
+                ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
+                    clusterName,
+                    clusterUUID,
+                    blobMetadata.name()
+                );
+                staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
+                if (clusterMetadataManifest.getGlobalMetadataFileName() != null) {
+                    if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) {
+                        String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/");
+                        staleGlobalMetadataPaths.add(
+                            new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
+                                globalMetadataSplitPath[globalMetadataSplitPath.length - 1]
+                            )
+                        );
+                    }
+                } else {
+                    if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) {
+                        String[] coordinationMetadataSplitPath = clusterMetadataManifest.getCoordinationMetadata()
+                            .getUploadedFilename()
+                            .split("/");
+                        staleGlobalMetadataPaths.add(
+                            new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
+                                coordinationMetadataSplitPath[coordinationMetadataSplitPath.length - 1]
+                            )
+                        );
+                    }
+                    if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) {
+                        String[] templatesMetadataSplitPath = clusterMetadataManifest.getTemplatesMetadata()
+                            .getUploadedFilename()
+                            .split("/");
+                        staleGlobalMetadataPaths.add(
+                            new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
+                                templatesMetadataSplitPath[templatesMetadataSplitPath.length - 1]
+                            )
+                        );
+                    }
+                    if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) {
+                        String[] settingsMetadataSplitPath = clusterMetadataManifest.getSettingsMetadata().getUploadedFilename().split("/");
+                        staleGlobalMetadataPaths.add(
+                            new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
+                                settingsMetadataSplitPath[settingsMetadataSplitPath.length - 1]
+                            )
+                        );
+                    }
+                    clusterMetadataManifest.getCustomMetadataMap().forEach((key, value) -> {
+                        if (filesToKeep.contains(value.getUploadedFilename()) == false) {
+                            String[] customMetadataSplitPath = value.getUploadedFilename().split("/");
+                            staleGlobalMetadataPaths.add(
+                                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
+                                    customMetadataSplitPath[customMetadataSplitPath.length - 1]
+                                )
+                            );
+                        }
+                    });
+                }
+
+                clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
+                    if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
+                        staleIndexMetadataPaths.add(
+                            new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+                                + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
+                        );
+                    }
+                });
+            });
+
+            if (staleManifestPaths.isEmpty()) {
+                logger.debug("No stale Remote Cluster Metadata files found");
+                return;
+            }
+
+            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
+            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
+            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
+        } catch (IllegalStateException e) {
+            logger.error("Error while fetching Remote Cluster Metadata manifests", e);
+        } catch (IOException e) {
+            logger.error("Error while deleting stale Remote Cluster Metadata files", e);
+            remoteStateStats.cleanUpAttemptFailed();
+        } catch (Exception e) {
+            logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e);
+            remoteStateStats.cleanUpAttemptFailed();
+        }
+    }
+
+    private void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
+        logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
+        getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths);
+    }
+
+    /**
+     * Purges all remote cluster state against provided cluster UUIDs
+     *
+     * @param clusterState      current state of the cluster
+     * @param committedManifest last committed ClusterMetadataManifest
+     */
+    public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
+        threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
+            String clusterName = clusterState.getClusterName().value();
+            logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
+            Set<String> allClustersUUIDsInRemote;
+            try {
+                allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
+            } catch (IOException e) {
+                logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
+                return;
+            }
+            // Retain last 2 cluster uuids data
+            allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
+            allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
+            deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
+        });
+    }
+
     public RemotePersistenceStats getStats() {
         return remoteStateStats;
     }
diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java
index 49545fa8a0c8b..76109ba10624a 100644
--- a/server/src/main/java/org/opensearch/node/Node.java
+++ b/server/src/main/java/org/opensearch/node/Node.java
@@ -138,7 +138,6 @@
 import org.opensearch.gateway.MetaStateService;
 import org.opensearch.gateway.PersistedClusterStateService;
 import org.opensearch.gateway.ShardsBatchGatewayAllocator;
-import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
 import org.opensearch.gateway.remote.RemoteClusterStateService;
 import org.opensearch.http.HttpServerTransport;
 import org.opensearch.identity.IdentityService;
@@ -753,7 +752,6 @@ protected Node(
                 threadPool::relativeTimeInMillis
             );
             final RemoteClusterStateService remoteClusterStateService;
-            final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
             final RemoteIndexPathUploader remoteIndexPathUploader;
             if (isRemoteStoreClusterStateEnabled(settings)) {
                 remoteIndexPathUploader = new RemoteIndexPathUploader(
@@ -766,16 +764,14 @@ protected Node(
                     nodeEnvironment.nodeId(),
                     repositoriesServiceReference::get,
                     settings,
-                    clusterService,
+                    clusterService.getClusterSettings(),
                     threadPool::preciseRelativeTimeInNanos,
                     threadPool,
                     List.of(remoteIndexPathUploader)
                 );
-                remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager();
             } else {
                 remoteClusterStateService = null;
                 remoteIndexPathUploader = null;
-                remoteClusterStateCleanupManager = null;
             }
 
             // collect engine factory providers from plugins
@@ -1380,7 +1376,6 @@ protected Node(
                 b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
                 b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
                 b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
-                b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager);
                 b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
                 b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
                 b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory);
diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
index 418e6d8de6adb..3ba98c44f8d3e 100644
--- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
+++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
@@ -462,7 +462,9 @@ public void testDataOnlyNodePersistence() throws Exception {
             });
             when(transportService.getThreadPool()).thenReturn(threadPool);
             ClusterService clusterService = mock(ClusterService.class);
-            when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
+            when(clusterService.getClusterSettings()).thenReturn(
+                new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
+            );
             final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(
                 nodeEnvironment,
                 xContentRegistry(),
@@ -485,7 +487,7 @@ public void testDataOnlyNodePersistence() throws Exception {
                         nodeEnvironment.nodeId(),
                         repositoriesServiceSupplier,
                         settings,
-                        clusterService,
+                        clusterSettings,
                         () -> 0L,
                         threadPool,
                         List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java
deleted file mode 100644
index 24fd1b164a4ff..0000000000000
--- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.gateway.remote;
-
-import org.opensearch.cluster.ClusterName;
-import org.opensearch.cluster.ClusterState;
-import org.opensearch.cluster.metadata.Metadata;
-import org.opensearch.cluster.node.DiscoveryNodes;
-import org.opensearch.cluster.service.ClusterApplierService;
-import org.opensearch.cluster.service.ClusterService;
-import org.opensearch.common.blobstore.BlobContainer;
-import org.opensearch.common.blobstore.BlobMetadata;
-import org.opensearch.common.blobstore.BlobPath;
-import org.opensearch.common.blobstore.BlobStore;
-import org.opensearch.common.blobstore.support.PlainBlobMetadata;
-import org.opensearch.common.settings.ClusterSettings;
-import org.opensearch.common.settings.Settings;
-import org.opensearch.common.util.concurrent.AbstractAsyncTask;
-import org.opensearch.core.action.ActionListener;
-import org.opensearch.repositories.RepositoriesService;
-import org.opensearch.repositories.blobstore.BlobStoreRepository;
-import org.opensearch.repositories.fs.FsRepository;
-import org.opensearch.test.OpenSearchTestCase;
-import org.opensearch.test.VersionUtils;
-import org.opensearch.threadpool.TestThreadPool;
-import org.opensearch.threadpool.ThreadPool;
-import org.junit.After;
-import org.junit.Before;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-
-import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1;
-import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
-import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
-import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion;
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
-import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.CLUSTER_STATE_PATH_TOKEN;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.encodeString;
-import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex;
-import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager;
-import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
-import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
-import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase {
-    private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
-    private Supplier<RepositoriesService> repositoriesServiceSupplier;
-    private RepositoriesService repositoriesService;
-    private BlobStoreRepository blobStoreRepository;
-    private BlobStore blobStore;
-    private ClusterSettings clusterSettings;
-    private ClusterApplierService clusterApplierService;
-    private ClusterState clusterState;
-    private Metadata metadata;
-    private RemoteClusterStateService remoteClusterStateService;
-    private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
-
-    @Before
-    public void setup() {
-        repositoriesServiceSupplier = mock(Supplier.class);
-        repositoriesService = mock(RepositoriesService.class);
-        when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
-
-        String stateRepoTypeAttributeKey = String.format(
-            Locale.getDefault(),
-            "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
-            "remote_store_repository"
-        );
-        String stateRepoSettingsAttributeKeyPrefix = String.format(
-            Locale.getDefault(),
-            "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
-            "remote_store_repository"
-        );
-
-        Settings settings = Settings.builder()
-            .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository")
-            .put(stateRepoTypeAttributeKey, FsRepository.TYPE)
-            .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
-            .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
-            .build();
-
-        clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
-        clusterApplierService = mock(ClusterApplierService.class);
-        clusterState = mock(ClusterState.class);
-        metadata = mock(Metadata.class);
-        ClusterService clusterService = mock(ClusterService.class);
-        when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
-        when(clusterState.getClusterName()).thenReturn(new ClusterName("test"));
-        when(metadata.clusterUUID()).thenReturn("testUUID");
-        when(clusterState.metadata()).thenReturn(metadata);
-        when(clusterApplierService.state()).thenReturn(clusterState);
-        when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
-
-        blobStoreRepository = mock(BlobStoreRepository.class);
-        blobStore = mock(BlobStore.class);
-        when(blobStoreRepository.blobStore()).thenReturn(blobStore);
-        when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository);
-
-        remoteClusterStateService = mock(RemoteClusterStateService.class);
-        when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats());
-        when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool);
-        when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
-        remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService);
-    }
-
-    @After
-    public void teardown() throws Exception {
-        super.tearDown();
-        remoteClusterStateCleanupManager.close();
-        threadPool.shutdown();
-    }
-
-    public void testDeleteClusterMetadata() throws IOException {
-        String clusterUUID = "clusterUUID";
-        String clusterName = "test-cluster";
-        List<BlobMetadata> inactiveBlobs = Arrays.asList(
-            new PlainBlobMetadata("manifest1.dat", 1L),
-            new PlainBlobMetadata("manifest2.dat", 1L),
-            new PlainBlobMetadata("manifest3.dat", 1L)
-        );
-        List<BlobMetadata> activeBlobs = Arrays.asList(
-            new PlainBlobMetadata("manifest4.dat", 1L),
-            new PlainBlobMetadata("manifest5.dat", 1L)
-        );
-        UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1");
-        UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2");
-        UploadedIndexMetadata index1UpdatedMetadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1_updated");
-        UploadedMetadataAttribute coordinationMetadata = new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination_metadata");
-        UploadedMetadataAttribute templateMetadata = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata");
-        UploadedMetadataAttribute settingMetadata = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata");
-        UploadedMetadataAttribute coordinationMetadataUpdated = new UploadedMetadataAttribute(
-            COORDINATION_METADATA,
-            "coordination_metadata_updated"
-        );
-        UploadedMetadataAttribute templateMetadataUpdated = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata_updated");
-        UploadedMetadataAttribute settingMetadataUpdated = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata_updated");
-        ClusterMetadataManifest manifest1 = ClusterMetadataManifest.builder()
-            .indices(List.of(index1Metadata))
-            .globalMetadataFileName("global_metadata")
-            .clusterTerm(1L)
-            .stateVersion(1L)
-            .codecVersion(CODEC_V1)
-            .stateUUID(randomAlphaOfLength(10))
-            .clusterUUID(clusterUUID)
-            .nodeId("nodeA")
-            .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
-            .previousClusterUUID(ClusterState.UNKNOWN_UUID)
-            .committed(true)
-            .build();
-        ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder(manifest1)
-            .indices(List.of(index1Metadata, index2Metadata))
-            .codecVersion(CODEC_V2)
-            .globalMetadataFileName(null)
-            .coordinationMetadata(coordinationMetadata)
-            .templatesMetadata(templateMetadata)
-            .settingMetadata(settingMetadata)
-            .build();
-        ClusterMetadataManifest manifest3 = ClusterMetadataManifest.builder(manifest2)
-            .indices(List.of(index1UpdatedMetadata, index2Metadata))
-            .settingMetadata(settingMetadataUpdated)
-            .build();
-
-        // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated
-        ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3)
-            .coordinationMetadata(coordinationMetadataUpdated)
-            .build();
-        ClusterMetadataManifest manifest5 = ClusterMetadataManifest.builder(manifest4).templatesMetadata(templateMetadataUpdated).build();
-
-        when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn(
-            manifest4,
-            manifest5,
-            manifest1,
-            manifest2,
-            manifest3
-        );
-        BlobContainer container = mock(BlobContainer.class);
-        when(blobStore.blobContainer(any())).thenReturn(container);
-        doNothing().when(container).deleteBlobsIgnoringIfNotExists(any());
-
-        remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs);
-        verify(container).deleteBlobsIgnoringIfNotExists(
-            List.of(
-                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + coordinationMetadata.getUploadedFilename() + ".dat",
-                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + settingMetadata.getUploadedFilename() + ".dat",
-                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + "global_metadata.dat"
-            )
-        );
-        verify(container).deleteBlobsIgnoringIfNotExists(
-            List.of(
-                new BlobPath().add(INDEX_PATH_TOKEN).add(index1Metadata.getIndexUUID()).buildAsString()
-                    + index1Metadata.getUploadedFilePath()
-                    + ".dat"
-            )
-        );
-        Set<String> staleManifest = new HashSet<>();
-        inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name()));
-        verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest));
-    }
-
-    public void testDeleteStaleClusterUUIDs() throws IOException {
-        final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
-        ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder()
-            .indices(List.of())
-            .clusterTerm(1L)
-            .stateVersion(1L)
-            .stateUUID(randomAlphaOfLength(10))
-            .clusterUUID("cluster-uuid1")
-            .nodeId("nodeA")
-            .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
-            .previousClusterUUID(ClusterState.UNKNOWN_UUID)
-            .committed(true)
-            .build();
-
-        BlobPath blobPath = new BlobPath().add("random-path");
-        BlobContainer uuidContainerContainer = mock(BlobContainer.class);
-        BlobContainer manifest2Container = mock(BlobContainer.class);
-        BlobContainer manifest3Container = mock(BlobContainer.class);
-        when(blobStore.blobContainer(any())).then(invocation -> {
-            BlobPath blobPath1 = invocation.getArgument(0);
-            if (blobPath1.buildAsString().endsWith("cluster-state/")) {
-                return uuidContainerContainer;
-            } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) {
-                return manifest2Container;
-            } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) {
-                return manifest3Container;
-            } else {
-                throw new IllegalArgumentException("Unexpected blob path " + blobPath1);
-            }
-        });
-        when(
-            manifest2Container.listBlobsByPrefixInSortedOrder(
-                MANIFEST_FILE_PREFIX + DELIMITER,
-                Integer.MAX_VALUE,
-                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
-            )
-        ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L)));
-        when(
-            manifest3Container.listBlobsByPrefixInSortedOrder(
-                MANIFEST_FILE_PREFIX + DELIMITER,
-                Integer.MAX_VALUE,
-                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
-            )
-        ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L)));
-        Set<String> uuids = new HashSet<>(Arrays.asList("cluster-uuid1", "cluster-uuid2", "cluster-uuid3"));
-        when(remoteClusterStateService.getAllClusterUUIDs(any())).thenReturn(uuids);
-        when(remoteClusterStateService.getCusterMetadataBasePath(any(), any())).then(
-            invocationOnMock -> blobPath.add(encodeString(invocationOnMock.getArgument(0)))
-                .add(CLUSTER_STATE_PATH_TOKEN)
-                .add((String) invocationOnMock.getArgument(1))
-        );
-        remoteClusterStateCleanupManager.start();
-        remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest);
-        try {
-            assertBusy(() -> {
-                verify(manifest2Container, times(1)).delete();
-                verify(manifest3Container, times(1)).delete();
-            });
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void testRemoteStateCleanupFailureStats() throws IOException {
-        BlobContainer blobContainer = mock(BlobContainer.class);
-        doThrow(IOException.class).when(blobContainer).delete();
-        when(blobStore.blobContainer(any())).thenReturn(blobContainer);
-        BlobPath blobPath = new BlobPath().add("random-path");
-        when((blobStoreRepository.basePath())).thenReturn(blobPath);
-        remoteClusterStateCleanupManager.start();
-        remoteClusterStateCleanupManager.deleteStaleUUIDsClusterMetadata("cluster1", List.of("cluster-uuid1"));
-        try {
-            assertBusy(() -> {
-                // wait for stats to get updated
-                assertNotNull(remoteClusterStateCleanupManager.getStats());
-                assertEquals(0, remoteClusterStateCleanupManager.getStats().getSuccessCount());
-                assertEquals(1, remoteClusterStateCleanupManager.getStats().getCleanupAttemptFailedCount());
-            });
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
-        BlobContainer blobContainer = mock(BlobContainer.class);
-        when(blobStore.blobContainer(any())).thenReturn(blobContainer);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicInteger callCount = new AtomicInteger(0);
-        doAnswer(invocation -> {
-            callCount.incrementAndGet();
-            if (latch.await(5000, TimeUnit.SECONDS) == false) {
-                throw new Exception("Timed out waiting for delete task queuing to complete");
-            }
-            return null;
-        }).when(blobContainer)
-            .listBlobsByPrefixInSortedOrder(
-                any(String.class),
-                any(int.class),
-                any(BlobContainer.BlobNameSortOrder.class),
-                any(ActionListener.class)
-            );
-
-        remoteClusterStateCleanupManager.start();
-        remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
-        remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
-
-        latch.countDown();
-        assertBusy(() -> assertEquals(1, callCount.get()));
-    }
-
-    public void testRemoteClusterStateCleanupSetting() {
-        remoteClusterStateCleanupManager.start();
-        // verify default value
-        assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileCleanupInterval());
-
-        // verify update interval
-        int cleanupInterval = randomIntBetween(1, 10);
-        Settings newSettings = Settings.builder().put("cluster.remote_store.state.cleanup_interval", cleanupInterval + "s").build();
-        clusterSettings.applySettings(newSettings);
-        assertEquals(cleanupInterval, remoteClusterStateCleanupManager.getStaleFileCleanupInterval().seconds());
-    }
-
-    public void testRemoteCleanupTaskScheduled() {
-        AbstractAsyncTask cleanupTask = remoteClusterStateCleanupManager.getStaleFileDeletionTask();
-        assertNull(cleanupTask);
-        // now the task should be initialized
-        remoteClusterStateCleanupManager.start();
-        assertNotNull(remoteClusterStateCleanupManager.getStaleFileDeletionTask());
-        assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().mustReschedule());
-        assertEquals(
-            clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING),
-            remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval()
-        );
-        assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
-        assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isClosed());
-    }
-
-    public void testRemoteCleanupSkipsOnOnlyElectedClusterManager() {
-        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
-        when(nodes.isLocalNodeElectedClusterManager()).thenReturn(false);
-        when(clusterState.nodes()).thenReturn(nodes);
-        RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
-        AtomicInteger callCount = new AtomicInteger(0);
-        doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
-        spyManager.cleanUpStaleFiles();
-        assertEquals(0, callCount.get());
-
-        when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
-        when(clusterState.version()).thenReturn(randomLongBetween(11, 20));
-        spyManager.cleanUpStaleFiles();
-        assertEquals(1, callCount.get());
-    }
-
-    public void testRemoteCleanupSkipsIfVersionIncrementLessThanThreshold() {
-        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
-        long version = randomLongBetween(1, SKIP_CLEANUP_STATE_CHANGES);
-        when(clusterApplierService.state()).thenReturn(clusterState);
-        when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
-        when(clusterState.nodes()).thenReturn(nodes);
-        when(clusterState.version()).thenReturn(version);
-
-        RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
-        AtomicInteger callCount = new AtomicInteger(0);
-        doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
-
-        remoteClusterStateCleanupManager.cleanUpStaleFiles();
-        assertEquals(0, callCount.get());
-    }
-
-    public void testRemoteCleanupCallsDeleteIfVersionIncrementGreaterThanThreshold() {
-        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
-        long version = randomLongBetween(SKIP_CLEANUP_STATE_CHANGES + 1, SKIP_CLEANUP_STATE_CHANGES + 10);
-        when(clusterApplierService.state()).thenReturn(clusterState);
-        when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
-        when(clusterState.nodes()).thenReturn(nodes);
-        when(clusterState.version()).thenReturn(version);
-
-        RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
-        AtomicInteger callCount = new AtomicInteger(0);
-        doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
-
-        // using spied cleanup manager so that stubbed deleteStaleClusterMetadata is called
-        spyManager.cleanUpStaleFiles();
-        assertEquals(1, callCount.get());
-    }
-
-    public void testRemoteCleanupSchedulesEvenAfterFailure() {
-        remoteClusterStateCleanupManager.start();
-        RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
-        AtomicInteger callCount = new AtomicInteger(0);
-        doAnswer(invocationOnMock -> {
-            callCount.incrementAndGet();
-            throw new RuntimeException("Test exception");
-        }).when(spyManager).cleanUpStaleFiles();
-        AsyncStaleFileDeletion task = new AsyncStaleFileDeletion(spyManager);
-        assertTrue(task.isScheduled());
-        task.run();
-        // Task is still scheduled after the failure
-        assertTrue(task.isScheduled());
-        assertEquals(1, callCount.get());
-
-        task.run();
-        // Task is still scheduled after the failure
-        assertTrue(task.isScheduled());
-        assertEquals(2, callCount.get());
-    }
-}
diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
index 5f0c371a3137e..1b242b921c0d7 100644
--- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
+++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
@@ -19,7 +19,6 @@
 import org.opensearch.cluster.metadata.Metadata;
 import org.opensearch.cluster.metadata.TemplatesMetadata;
 import org.opensearch.cluster.node.DiscoveryNodes;
-import org.opensearch.cluster.service.ClusterService;
 import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
 import org.opensearch.common.blobstore.BlobContainer;
 import org.opensearch.common.blobstore.BlobMetadata;
@@ -73,6 +72,9 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -91,6 +93,7 @@
 import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
 import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
@@ -106,12 +109,13 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class RemoteClusterStateServiceTests extends OpenSearchTestCase {
 
     private RemoteClusterStateService remoteClusterStateService;
-    private ClusterService clusterService;
     private ClusterSettings clusterSettings;
     private Supplier<RepositoriesService> repositoriesServiceSupplier;
     private RepositoriesService repositoriesService;
@@ -144,8 +148,6 @@ public void setup() {
             .build();
 
         clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
-        clusterService = mock(ClusterService.class);
-        when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
         NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
             Stream.of(
                 NetworkModule.getNamedXContents().stream(),
@@ -163,7 +165,7 @@ public void setup() {
             "test-node-id",
             repositoriesServiceSupplier,
             settings,
-            clusterService,
+            clusterSettings,
             () -> 0L,
             threadPool,
             List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
@@ -185,14 +187,14 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException
 
     public void testFailInitializationWhenRemoteStateDisabled() {
         final Settings settings = Settings.builder().build();
-        when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
+        ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
         assertThrows(
             AssertionError.class,
             () -> new RemoteClusterStateService(
                 "test-node-id",
                 repositoriesServiceSupplier,
                 settings,
-                clusterService,
+                clusterSettings,
                 () -> 0L,
                 threadPool,
                 List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
@@ -1278,6 +1280,72 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx
         assertThat(previousClusterUUID, equalTo("cluster-uuid2"));
     }
 
+    public void testDeleteStaleClusterUUIDs() throws IOException {
+        final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+        ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder()
+            .indices(List.of())
+            .clusterTerm(1L)
+            .stateVersion(1L)
+            .stateUUID(randomAlphaOfLength(10))
+            .clusterUUID("cluster-uuid1")
+            .nodeId("nodeA")
+            .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
+            .previousClusterUUID(ClusterState.UNKNOWN_UUID)
+            .committed(true)
+            .build();
+
+        BlobPath blobPath = new BlobPath().add("random-path");
+        when((blobStoreRepository.basePath())).thenReturn(blobPath);
+        BlobContainer uuidContainerContainer = mock(BlobContainer.class);
+        BlobContainer manifest2Container = mock(BlobContainer.class);
+        BlobContainer manifest3Container = mock(BlobContainer.class);
+        when(blobStore.blobContainer(any())).then(invocation -> {
+            BlobPath blobPath1 = invocation.getArgument(0);
+            if (blobPath1.buildAsString().endsWith("cluster-state/")) {
+                return uuidContainerContainer;
+            } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) {
+                return manifest2Container;
+            } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) {
+                return manifest3Container;
+            } else {
+                throw new IllegalArgumentException("Unexpected blob path " + blobPath1);
+            }
+        });
+        Map<String, BlobContainer> blobMetadataMap = Map.of(
+            "cluster-uuid1",
+            mock(BlobContainer.class),
+            "cluster-uuid2",
+            mock(BlobContainer.class),
+            "cluster-uuid3",
+            mock(BlobContainer.class)
+        );
+        when(uuidContainerContainer.children()).thenReturn(blobMetadataMap);
+        when(
+            manifest2Container.listBlobsByPrefixInSortedOrder(
+                MANIFEST_FILE_PREFIX + DELIMITER,
+                Integer.MAX_VALUE,
+                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
+            )
+        ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L)));
+        when(
+            manifest3Container.listBlobsByPrefixInSortedOrder(
+                MANIFEST_FILE_PREFIX + DELIMITER,
+                Integer.MAX_VALUE,
+                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
+            )
+        ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L)));
+        remoteClusterStateService.start();
+        remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest);
+        try {
+            assertBusy(() -> {
+                verify(manifest2Container, times(1)).delete();
+                verify(manifest3Container, times(1)).delete();
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public void testRemoteStateStats() throws IOException {
         final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
         mockBlobStoreObjects();
@@ -1290,6 +1358,26 @@ public void testRemoteStateStats() throws IOException {
         assertEquals(0, remoteClusterStateService.getStats().getFailedCount());
     }
 
+    public void testRemoteStateCleanupFailureStats() throws IOException {
+        BlobContainer blobContainer = mock(BlobContainer.class);
+        doThrow(IOException.class).when(blobContainer).delete();
+        when(blobStore.blobContainer(any())).thenReturn(blobContainer);
+        BlobPath blobPath = new BlobPath().add("random-path");
+        when((blobStoreRepository.basePath())).thenReturn(blobPath);
+        remoteClusterStateService.start();
+        remoteClusterStateService.deleteStaleUUIDsClusterMetadata("cluster1", Arrays.asList("cluster-uuid1"));
+        try {
+            assertBusy(() -> {
+                // wait for stats to get updated
+                assertTrue(remoteClusterStateService.getStats() != null);
+                assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
+                assertEquals(1, remoteClusterStateService.getStats().getCleanupAttemptFailedCount());
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public void testFileNames() {
         final Index index = new Index("test-index", "index-uuid");
         final Settings idxSettings = Settings.builder()
@@ -1330,6 +1418,36 @@ private void verifyManifestFileNameWithCodec(int codecVersion) {
         assertThat(splittedName[3], is("P"));
     }
 
+    public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
+        BlobContainer blobContainer = mock(BlobContainer.class);
+        BlobPath blobPath = new BlobPath().add("random-path");
+        when((blobStoreRepository.basePath())).thenReturn(blobPath);
+        when(blobStore.blobContainer(any())).thenReturn(blobContainer);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger callCount = new AtomicInteger(0);
+        doAnswer(invocation -> {
+            callCount.incrementAndGet();
+            if (latch.await(5000, TimeUnit.SECONDS) == false) {
+                throw new Exception("Timed out waiting for delete task queuing to complete");
+            }
+            return null;
+        }).when(blobContainer)
+            .listBlobsByPrefixInSortedOrder(
+                any(String.class),
+                any(int.class),
+                any(BlobContainer.BlobNameSortOrder.class),
+                any(ActionListener.class)
+            );
+
+        remoteClusterStateService.start();
+        remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
+        remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
+
+        latch.countDown();
+        assertBusy(() -> assertEquals(1, callCount.get()));
+    }
+
     public void testIndexMetadataUploadWaitTimeSetting() {
         // verify default value
         assertEquals(
@@ -1773,7 +1891,7 @@ private static ClusterState.Builder generateClusterStateWithGlobalMetadata() {
             );
     }
 
-    static ClusterState.Builder generateClusterStateWithOneIndex() {
+    private static ClusterState.Builder generateClusterStateWithOneIndex() {
         final Index index = new Index("test-index", "index-uuid");
         final Settings idxSettings = Settings.builder()
             .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
@@ -1803,7 +1921,7 @@ static ClusterState.Builder generateClusterStateWithOneIndex() {
             );
     }
 
-    static DiscoveryNodes nodesWithLocalNodeClusterManager() {
+    private static DiscoveryNodes nodesWithLocalNodeClusterManager() {
         return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build();
     }