Skip to content

Commit

Permalink
relaxing the join validation for nodes which have only store disabled…
Browse files Browse the repository at this point in the history
… but only publication enabled (opensearch-project#15471)

Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
  • Loading branch information
rajiv-kv authored and dk2k committed Oct 16, 2024
1 parent 0532c68 commit a9293fb
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508))
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
Expand Down Expand Up @@ -458,7 +459,7 @@ public static void ensureNodesCompatibility(
);
}

ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
ensureRemoteRepositoryCompatibility(joiningNode, currentNodes, metadata);
}

/**
Expand Down Expand Up @@ -491,6 +492,30 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
}
}

public static void ensureRemoteRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

boolean isClusterRemoteStoreEnabled = existingNodes.stream().anyMatch(DiscoveryNode::isRemoteStoreNode);
if (isClusterRemoteStoreEnabled || joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
} else {
ensureRemoteClusterStateNodesCompatibility(joiningNode, currentNodes);
}
}

private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

assert existingNodes.isEmpty() == false;
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
}
}

/**
* The method ensures homogeneity -
* 1. The joining node has to be a remote store backed if it's joining a remote store backed cluster. Validates
Expand All @@ -506,6 +531,7 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
* needs to be modified.
*/
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {

List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

assert existingNodes.isEmpty() == false;
Expand Down Expand Up @@ -587,6 +613,23 @@ private static void ensureRemoteStoreNodesCompatibility(
}
}

private static void ensureRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode, List<String> reposToValidate) {

RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);

if (existingRemoteStoreNodeAttribute.equalsForRepositories(joiningRemoteStoreNodeAttribute, reposToValidate) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
}

public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadat
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());

return equalsRepository(currentRepositories, otherRepositories);
}

public boolean equalsIgnoreGenerationsForRepo(@Nullable RepositoriesMetadata other, List<String> reposToValidate) {
if (other == null) {
return false;
}
List<RepositoryMetadata> currentRepositories = repositories.stream()
.filter(repo -> reposToValidate.contains(repo.name()))
.collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
.filter(repo -> reposToValidate.contains(repo.name()))
.collect(Collectors.toList());

return equalsRepository(currentRepositories, otherRepositories);
}

public static boolean equalsRepository(List<RepositoryMetadata> currentRepositories, List<RepositoryMetadata> otherRepositories) {
if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* A discovery node represents a node that is part of the cluster.
Expand Down Expand Up @@ -509,7 +510,8 @@ public boolean isSearchNode() {
* @return true if the node contains remote store node attributes, false otherwise
*/
public boolean isRemoteStoreNode() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX));
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public class RemoteStoreNodeAttribute {
REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
);

public static List<String> REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES = List.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);

/**
* Creates a new {@link RemoteStoreNodeAttribute}
*/
Expand Down Expand Up @@ -261,6 +266,14 @@ public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
}

public boolean equalsForRepositories(Object otherNode, List<String> repositoryToValidate) {
if (this == otherNode) return true;
if (otherNode == null || getClass() != otherNode.getClass()) return false;

RemoteStoreNodeAttribute other = (RemoteStoreNodeAttribute) otherNode;
return this.getRepositoriesMetadata().equalsIgnoreGenerationsForRepo(other.repositoriesMetadata, repositoryToValidate);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
Expand Down Expand Up @@ -901,6 +902,7 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion

private Map<String, String> getRemoteStoreNodeAttributes() {
Map<String, String> remoteStoreNodeAttributes = new HashMap<>();
remoteStoreNodeAttributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-cluster-repo-1");
remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1");
remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1");
return remoteStoreNodeAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,100 @@ public void testJoinClusterWithRemoteStateNodeJoiningRemoteStateCluster() {
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testJoinRemotePublicationClusterWithNonRemoteNodes() {
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remotePublicationNodeAttributes(),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();

DiscoveryNode joiningNode = newDiscoveryNode(new HashMap<>());
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testJoinRemotePublicationCluster() {
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remotePublicationNodeAttributes(),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();

DiscoveryNode joiningNode = newDiscoveryNode(remotePublicationNodeAttributes());
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testJoinRemotePubClusterWithRemoteStoreNodes() {
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remotePublicationNodeAttributes(),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();

Map<String, String> newNodeAttributes = new HashMap<>();
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
newNodeAttributes.putAll(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));

DiscoveryNode joiningNode = newDiscoveryNode(newNodeAttributes);
Exception e = assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
}

public void testPreventJoinRemotePublicationClusterWithIncompatibleAttributes() {
Map<String, String> existingNodeAttributes = remotePublicationNodeAttributes();
Map<String, String> remoteStoreNodeAttributes = remotePublicationNodeAttributes();
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();

for (Map.Entry<String, String> nodeAttribute : existingNodeAttributes.entrySet()) {
remoteStoreNodeAttributes.put(nodeAttribute.getKey(), null);
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes);
Exception e = assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]")
|| e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);

remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue());
}
}

public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster() {
Map<String, String> existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
Expand All @@ -628,16 +722,7 @@ public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster()
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
assertTrue(e.getMessage().equals("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"));
}

public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
Expand All @@ -657,16 +742,7 @@ public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster()
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
}

public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
Expand Down Expand Up @@ -1077,6 +1153,39 @@ public void testRemoteRoutingTableNodeJoinNodeWithRemoteAndRoutingRepoDifference
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
final DiscoveryNode remoteStoreNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final DiscoveryNode nonRemoteStoreNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
new HashMap<>(),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final Settings settings = Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.build();
final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
Metadata metadata = Metadata.builder().persistentSettings(settings).build();
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(remoteStoreNode).add(nonRemoteStoreNode).localNodeId(remoteStoreNode.getId()).build())
.metadata(metadata)
.build();

DiscoveryNode joiningNode = newDiscoveryNode(remotePublicationNodeAttributes());
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
throws Exception {

Expand Down Expand Up @@ -1115,6 +1224,7 @@ private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) {
}

private static final String SEGMENT_REPO = "segment-repo";

private static final String TRANSLOG_REPO = "translog-repo";
private static final String CLUSTER_STATE_REPO = "cluster-state-repo";
private static final String COMMON_REPO = "remote-repo";
Expand Down Expand Up @@ -1161,6 +1271,13 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
};
}

private Map<String, String> remotePublicationNodeAttributes() {
Map<String, String> existingNodeAttributes = new HashMap<>();
existingNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
existingNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
return existingNodeAttributes;
}

private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
String clusterStateRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
Expand Down
Loading

0 comments on commit a9293fb

Please sign in to comment.