Skip to content

Commit

Permalink
fix stale remote cluster uuid state not purged from remote (opensearc…
Browse files Browse the repository at this point in the history
…h-project#10016)

* fix stale remote cluster uuid state not purged from remote

Signed-off-by: bansvaru <[email protected]>

* fix tests

Signed-off-by: bansvaru <[email protected]>

* use new limit parameter

Signed-off-by: bansvaru <[email protected]>

* minor refactoring

Signed-off-by: bansvaru <[email protected]>

* delete index metadata files before manifest file

Signed-off-by: bansvaru <[email protected]>

* add basic UT

Signed-off-by: bansvaru <[email protected]>

* delete all data related to a cluster uuid in a single call

Signed-off-by: bansvaru <[email protected]>

* fix git diff

Signed-off-by: bansvaru <[email protected]>

* remove unreferenced code

Signed-off-by: bansvaru <[email protected]>

* fix spa

Signed-off-by: bansvaru <[email protected]>

---------

Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi authored Oct 5, 2023
1 parent 211578c commit 1eec0f1
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ public class RemoteClusterStateService implements Closeable {
Property.Final
);

private static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
private static final String INDEX_PATH_TOKEN = "index";
private static final String MANIFEST_PATH_TOKEN = "manifest";
private static final String MANIFEST_FILE_PREFIX = "manifest";
private static final String INDEX_METADATA_FILE_PREFIX = "metadata";
public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
public static final String INDEX_PATH_TOKEN = "index";
public static final String MANIFEST_PATH_TOKEN = "manifest";
public static final String MANIFEST_FILE_PREFIX = "manifest";
public static final String INDEX_METADATA_FILE_PREFIX = "metadata";

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -385,13 +385,20 @@ private void writeIndexMetadataAsync(
@Nullable
public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
throws IOException {
assert clusterState != null : "Last accepted cluster state is not set";
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
assert clusterState != null : "Last accepted cluster state is not set";
assert previousManifest != null : "Last cluster metadata manifest is not set";
return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), true);
ClusterMetadataManifest committedManifest = uploadManifest(
clusterState,
previousManifest.getIndices(),
previousManifest.getPreviousClusterUUID(),
true
);
deleteStaleClusterUUIDs(clusterState, committedManifest);
return committedManifest;
}

@Override
Expand Down Expand Up @@ -719,30 +726,42 @@ private boolean isInvalidClusterUUID(ClusterMetadataManifest manifest) {
}

/**
* Fetch latest ClusterMetadataManifest file from remote state store
* Fetch ClusterMetadataManifest files from remote state store in order
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
* @param limit max no of files to fetch
* @return all manifest file names
*/
private Optional<String> getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
private List<BlobMetadata> getManifestFileNames(String clusterName, String clusterUUID, int limit) throws IllegalStateException {
try {
/**
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will get the latest manifest file
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will list the latest manifest file first
* as the manifest file name generated via {@link RemoteClusterStateService#getManifestFileName} ensures
* when sorted in LEXICOGRAPHIC order the latest uploaded manifest file comes on top.
*/
List<BlobMetadata> manifestFilesMetadata = manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder(
return manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder(
MANIFEST_FILE_PREFIX + DELIMITER,
1,
limit,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return Optional.of(manifestFilesMetadata.get(0).name());
}
} catch (IOException e) {
throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e);
}
}

/**
* Fetch latest ClusterMetadataManifest file from remote state store
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
*/
private Optional<String> getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
List<BlobMetadata> manifestFilesMetadata = getManifestFileNames(clusterName, clusterUUID, 1);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return Optional.of(manifestFilesMetadata.get(0).name());
}
logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}", clusterName, clusterUUID);
return Optional.empty();
}
Expand Down Expand Up @@ -791,7 +810,7 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) {
* @param clusterName name of the cluster
* @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
*/
public void deleteStaleClusterMetadata(String clusterName, List<String> clusterUUIDs) {
private void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUIDs) {
clusterUUIDs.forEach(clusterUUID -> {
getBlobStoreTransferService().deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
Expand Down Expand Up @@ -923,4 +942,27 @@ private void deleteStalePaths(String clusterName, String clusterUUID, List<Strin
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths);
}

/**
* Purges all remote cluster state against provided cluster UUIDs
* @param clusterState current state of the cluster
* @param committedManifest last committed ClusterMetadataManifest
*/
public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
String clusterName = clusterState.getClusterName().value();
logger.info("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
Set<String> allClustersUUIDsInRemote;
try {
allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
} catch (IOException e) {
logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
return;
}
// Retain last 2 cluster uuids data
allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;

import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
Expand All @@ -76,6 +78,8 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class RemoteClusterStateServiceTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -334,13 +338,8 @@ public void testReadLatestMetadataManifestFailedIOException() throws IOException
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();

BlobContainer blobContainer = mockBlobStoreObjects();
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenThrow(IOException.class);
when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC))
.thenThrow(IOException.class);

remoteClusterStateService.start();
Exception e = assertThrows(
Expand All @@ -357,13 +356,8 @@ public void testReadLatestMetadataManifestFailedNoManifestFileInRemote() throws
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();

BlobContainer blobContainer = mockBlobStoreObjects();
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(List.of());
when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC))
.thenReturn(List.of());

remoteClusterStateService.start();
Optional<ClusterMetadataManifest> manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
Expand All @@ -378,13 +372,8 @@ public void testReadLatestMetadataManifestFailedManifestFileRemoveAfterFetchInRe

BlobContainer blobContainer = mockBlobStoreObjects();
BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1);
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(Arrays.asList(blobMetadata));
when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC))
.thenReturn(Arrays.asList(blobMetadata));
when(blobContainer.readBlob("manifestFileName")).thenThrow(FileNotFoundException.class);

remoteClusterStateService.start();
Expand Down Expand Up @@ -618,6 +607,72 @@ public void testGetValidPreviousClusterUUIDWithInvalidMultipleChains() throws IO
assertThrows(IllegalStateException.class, () -> remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster"));
}

public void testDeleteStaleClusterUUIDs() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID(randomAlphaOfLength(10))
.clusterUUID("cluster-uuid1")
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID(ClusterState.UNKNOWN_UUID)
.committed(true)
.build();

BlobPath blobPath = new BlobPath().add("random-path");
when((blobStoreRepository.basePath())).thenReturn(blobPath);
BlobContainer uuidContainerContainer = mock(BlobContainer.class);
BlobContainer manifest2Container = mock(BlobContainer.class);
BlobContainer manifest3Container = mock(BlobContainer.class);
when(blobStore.blobContainer(any())).then(invocation -> {
BlobPath blobPath1 = invocation.getArgument(0);
if (blobPath1.buildAsString().endsWith("cluster-state/")) {
return uuidContainerContainer;
} else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) {
return manifest2Container;
} else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) {
return manifest3Container;
} else {
throw new IllegalArgumentException("Unexpected blob path " + blobPath1);
}
});
Map<String, BlobContainer> blobMetadataMap = Map.of(
"cluster-uuid1",
mock(BlobContainer.class),
"cluster-uuid2",
mock(BlobContainer.class),
"cluster-uuid3",
mock(BlobContainer.class)
);
when(uuidContainerContainer.children()).thenReturn(blobMetadataMap);
when(
manifest2Container.listBlobsByPrefixInSortedOrder(
MANIFEST_FILE_PREFIX + DELIMITER,
Integer.MAX_VALUE,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L)));
when(
manifest3Container.listBlobsByPrefixInSortedOrder(
MANIFEST_FILE_PREFIX + DELIMITER,
Integer.MAX_VALUE,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L)));
remoteClusterStateService.start();
remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest);
try {
assertBusy(() -> {
verify(manifest2Container, times(1)).delete();
verify(manifest3Container, times(1)).delete();
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException {
final BlobPath blobPath = mock(BlobPath.class);
when((blobStoreRepository.basePath())).thenReturn(blobPath);
Expand Down Expand Up @@ -760,13 +815,8 @@ private void mockBlobContainer(
Map<String, IndexMetadata> indexMetadataMap
) throws IOException {
BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1);
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(Arrays.asList(blobMetadata));
when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC))
.thenReturn(Arrays.asList(blobMetadata));

BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
clusterMetadataManifest,
Expand Down

0 comments on commit 1eec0f1

Please sign in to comment.