Skip to content

Commit

Permalink
Decouple remote state configuration (opensearch-project#11858)
Browse files Browse the repository at this point in the history
* Decouple remote state configuration

Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha authored and rayshrey committed Mar 18, 2024
1 parent cef6136 commit 2e6dc0d
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;

/**
* Service responsible for submitting create index requests
Expand Down Expand Up @@ -971,7 +971,7 @@ private static void updateReplicationStrategy(
indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(combinedTemplateSettings);
} else if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings)) {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings);
} else if (isRemoteStoreAttributePresent(clusterSettings)) {
} else if (isRemoteDataAttributePresent(clusterSettings)) {
indexReplicationType = ReplicationType.SEGMENT;
} else {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.getDefault(clusterSettings);
Expand All @@ -985,7 +985,7 @@ private static void updateReplicationStrategy(
* @param clusterSettings cluster level settings
*/
private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) {
if (isRemoteStoreAttributePresent(clusterSettings)) {
if (isRemoteDataAttributePresent(clusterSettings)) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(
SETTING_REMOTE_SEGMENT_STORE_REPOSITORY,
Expand Down Expand Up @@ -1577,7 +1577,7 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu
* @param clusterSettings cluster setting
*/
static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) {
if (isRemoteStoreAttributePresent(settings) == false
if (isRemoteDataAttributePresent(settings) == false
|| IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false
|| clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,8 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na
}

private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
Set<String> repositoryNames = getValidatedRepositoryNames(node);
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
Set<String> repositoryNames = new HashSet<>();

repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));

for (String repositoryName : repositoryNames) {
repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName));
Expand All @@ -145,12 +141,36 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
return new RepositoriesMetadata(repositoryMetadataList);
}

private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
Set<String> repositoryNames = new HashSet<>();
if (node.getAttributes().containsKey(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)
|| node.getAttributes().containsKey(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}
return repositoryNames;
}

public static boolean isRemoteStoreAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false;
}

public static boolean isRemoteDataAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false
|| settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false;
}

public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteStoreAttributePresent(settings);
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
&& isRemoteClusterStateAttributePresent(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ public void testJoinClusterWithNonRemoteStoreNodeJoining() {
}

public void testJoinClusterWithRemoteStoreNodeJoining() {
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
Map<String, String> map = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
DiscoveryNode joiningNode = newDiscoveryNode(map);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(joiningNode).build())
.build();
Expand Down Expand Up @@ -582,12 +583,94 @@ public void testPreventJoinClusterWithRemoteStoreNodeWithPartialAttributesJoinin
);
assertTrue(
e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]")
|| e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);

remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue());
}
}

public void testJoinClusterWithRemoteStateNodeJoiningRemoteStateCluster() {
Map<String, String> existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster() {
Map<String, String> existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
Exception e = assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
}

public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
Map<String, String> existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
Exception e = assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
}

public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
Map<String, String> remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
final AllocationService allocationService = mock(AllocationService.class);
Expand Down Expand Up @@ -869,6 +952,23 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);

return new HashMap<>() {
{
put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName);
put(segmentRepositoryTypeAttributeKey, "s3");
put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket");
put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path");
put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName);
putIfAbsent(translogRepositoryTypeAttributeKey, "s3");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path");
putAll(remoteStateNodeAttributes(clusterStateRepo));
}
};
}

private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
String clusterStateRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
Expand All @@ -882,14 +982,6 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St

return new HashMap<>() {
{
put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName);
put(segmentRepositoryTypeAttributeKey, "s3");
put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket");
put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path");
put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName);
putIfAbsent(translogRepositoryTypeAttributeKey, "s3");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path");
put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, clusterStateRepo);
putIfAbsent(clusterStateRepositoryTypeAttributeKey, "s3");
putIfAbsent(clusterStateRepositorySettingsAttributeKeyPrefix + "bucket", "state_bucket");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1901,7 +1901,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() {
request,
Settings.EMPTY,
null,
Settings.builder().put("node.attr.remote_store.setting", "test").build(),
Settings.builder().put("node.attr.remote_store.segment.repository", "test").build(),
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
Expand Down

0 comments on commit 2e6dc0d

Please sign in to comment.