Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into skip_awareness_with_auto_…
Browse files Browse the repository at this point in the history
…expand_all
  • Loading branch information
amberzsy authored Sep 13, 2024
2 parents a27c4e3 + 4223fab commit 74ae2bc
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -75,25 +77,46 @@ public RemoteStorePinnedTimestampService(
* and starts the asynchronous update task.
*/
public void start() {
validateRemoteStoreConfiguration();
blobContainer = validateAndCreateBlobContainer(settings, repositoriesService.get());
startAsyncUpdateTask(RemoteStoreSettings.getPinnedTimestampsSchedulerInterval());
}

private void validateRemoteStoreConfiguration() {
private static BlobContainer validateAndCreateBlobContainer(Settings settings, RepositoriesService repositoriesService) {
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote Segment Store repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
final Repository repository = repositoriesService.repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
blobContainer = blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN));
return blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN));
}

private void startAsyncUpdateTask(TimeValue pinnedTimestampsSchedulerInterval) {
asyncUpdatePinnedTimestampTask = new AsyncUpdatePinnedTimestampTask(logger, threadPool, pinnedTimestampsSchedulerInterval, true);
}

public static Map<String, Set<Long>> fetchPinnedTimestamps(Settings settings, RepositoriesService repositoriesService)
throws IOException {
BlobContainer blobContainer = validateAndCreateBlobContainer(settings, repositoriesService);
Set<String> pinnedTimestamps = blobContainer.listBlobs().keySet();
Map<String, Set<Long>> pinningEntityTimestampMap = new HashMap<>();
for (String pinnedTimestamp : pinnedTimestamps) {
try {
String[] tokens = pinnedTimestamp.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR);
Long timestamp = Long.parseLong(tokens[tokens.length - 1]);
String pinningEntity = pinnedTimestamp.substring(0, pinnedTimestamp.lastIndexOf(PINNED_TIMESTAMPS_FILENAME_SEPARATOR));
if (pinningEntityTimestampMap.containsKey(pinningEntity) == false) {
pinningEntityTimestampMap.put(pinningEntity, new HashSet<>());
}
pinningEntityTimestampMap.get(pinningEntity).add(timestamp);
} catch (NumberFormatException e) {
logger.error("Exception while parsing pinned timestamp from {}, skipping this entry", pinnedTimestamp);
}
}
return pinningEntityTimestampMap;
}

/**
* Pins a timestamp in the remote store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -84,6 +86,7 @@
import java.util.stream.Stream;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;

/**
Expand Down Expand Up @@ -123,6 +126,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
private final RepositoriesStatsArchive repositoriesStatsArchive;
private final ClusterManagerTaskThrottler.ThrottlingKey putRepositoryTaskKey;
private final ClusterManagerTaskThrottler.ThrottlingKey deleteRepositoryTaskKey;
private final Settings settings;

public RepositoriesService(
Settings settings,
Expand All @@ -132,6 +136,7 @@ public RepositoriesService(
Map<String, Repository.Factory> internalTypesRegistry,
ThreadPool threadPool
) {
this.settings = settings;
this.typesRegistry = typesRegistry;
this.internalTypesRegistry = internalTypesRegistry;
this.clusterService = clusterService;
Expand Down Expand Up @@ -173,7 +178,7 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final
CryptoMetadata.fromRequest(request.cryptoSettings())
);
validate(request.name());
validateRepositoryMetadataSettings(clusterService, request.name(), request.settings());
validateRepositoryMetadataSettings(clusterService, request.name(), request.settings(), repositories, settings, this);
if (newRepositoryMetadata.cryptoMetadata() != null) {
validate(newRepositoryMetadata.cryptoMetadata().keyProviderName());
}
Expand Down Expand Up @@ -684,7 +689,10 @@ public static void validate(final String identifier) {
public static void validateRepositoryMetadataSettings(
ClusterService clusterService,
final String repositoryName,
final Settings repositoryMetadataSettings
final Settings repositoryMetadataSettings,
Map<String, Repository> repositories,
Settings settings,
RepositoriesService repositoriesService
) {
// We can add more validations here for repository settings in the future.
Version minVersionInCluster = clusterService.state().getNodes().getMinNodeVersion();
Expand All @@ -699,6 +707,51 @@ public static void validateRepositoryMetadataSettings(
+ minVersionInCluster
);
}
if (SHALLOW_SNAPSHOT_V2.get(repositoryMetadataSettings)) {
if (minVersionInCluster.onOrAfter(Version.V_2_17_0) == false) {
throw new RepositoryException(
repositoryName,
"setting "
+ SHALLOW_SNAPSHOT_V2.getKey()
+ " cannot be enabled as some of the nodes in cluster are on version older than "
+ Version.V_2_17_0
+ ". Minimum node version in cluster is: "
+ minVersionInCluster
);
}
if (repositoryName.contains(SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER)) {
throw new RepositoryException(
repositoryName,
"setting "
+ SHALLOW_SNAPSHOT_V2.getKey()
+ " cannot be enabled for repository with "
+ SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER
+ " in the name as this delimiter is used to create pinning entity"
);
}
if (repositoryWithShallowV2Exists(repositories)) {
throw new RepositoryException(
repositoryName,
"setting "
+ SHALLOW_SNAPSHOT_V2.getKey()
+ " cannot be enabled as this setting can be enabled only on one repository "
+ " and one or more repositories in the cluster have the setting as enabled"
);
}
try {
if (pinnedTimestampExistsWithDifferentRepository(repositoryName, settings, repositoriesService)) {
throw new RepositoryException(
repositoryName,
"setting "
+ SHALLOW_SNAPSHOT_V2.getKey()
+ " cannot be enabled if there are existing snapshots created with shallow V2 "
+ "setting using different repository."
);
}
} catch (IOException e) {
throw new RepositoryException(repositoryName, "Exception while fetching pinned timestamp details");
}
}
// Validation to not allow users to create system repository via put repository call.
if (isSystemRepositorySettingPresent(repositoryMetadataSettings)) {
throw new RepositoryException(
Expand All @@ -710,6 +763,28 @@ public static void validateRepositoryMetadataSettings(
}
}

private static boolean repositoryWithShallowV2Exists(Map<String, Repository> repositories) {
return repositories.values().stream().anyMatch(repo -> SHALLOW_SNAPSHOT_V2.get(repo.getMetadata().settings()));
}

private static boolean pinnedTimestampExistsWithDifferentRepository(
String newRepoName,
Settings settings,
RepositoriesService repositoriesService
) throws IOException {
Map<String, Set<Long>> pinningEntityTimestampMap = RemoteStorePinnedTimestampService.fetchPinnedTimestamps(
settings,
repositoriesService
);
for (String pinningEntity : pinningEntityTimestampMap.keySet()) {
String repoNameWithPinnedTimestamps = pinningEntity.split(SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER)[0];
if (repoNameWithPinnedTimestamps.equals(newRepoName) == false) {
return true;
}
}
return false;
}

private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
if (isRepositoryInUse(clusterState, repository)) {
throw new IllegalStateException("trying to modify or unregister repository that is currently used");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void onTaskCompleted(Task task) {
if (!TASKS_TO_TRACK.contains(task.getClass())) {
return;
}
this.cancelledTaskTracker.entrySet().removeIf(entry -> entry.getKey() == task.getId());
this.cancelledTaskTracker.remove(task.getId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
package org.opensearch.repositories.blobstore;

import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -41,13 +43,16 @@
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
Expand All @@ -64,6 +69,9 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

/**
Expand All @@ -81,6 +89,7 @@ protected Settings nodeSettings() {
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo"))
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), tempDir.getParent())
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
}

Expand Down Expand Up @@ -373,4 +382,119 @@ public void testRetrieveShallowCopySnapshotCase2() throws IOException {
assertThat(snapshotIds, equalTo(originalSnapshots));
}

public void testRepositoryCreationShallowV2() throws Exception {
Client client = client();

Settings snapshotRepoSettings1 = Settings.builder()
.put(node().settings())
.put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings()))
.put(REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(SHALLOW_SNAPSHOT_V2.getKey(), true)
.build();

String invalidRepoName = "test" + SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + "repo-1";
try {
createRepository(client, invalidRepoName, snapshotRepoSettings1);
} catch (RepositoryException e) {
assertEquals(
"["
+ invalidRepoName
+ "] setting shallow_snapshot_v2 cannot be enabled for repository with __ in the name as this delimiter is used to create pinning entity",
e.getMessage()
);
}

// Create repo with shallow snapshot V2 enabled
createRepository(client, "test-repo-1", snapshotRepoSettings1);

logger.info("--> verify the repository");
VerifyRepositoryResponse verifyRepositoryResponse = client.admin().cluster().prepareVerifyRepository("test-repo-1").get();
assertNotNull(verifyRepositoryResponse.getNodes());

GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));

Settings snapshotRepoSettings2 = Settings.builder()
.put(node().settings())
.put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings()))
.put(REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(SHALLOW_SNAPSHOT_V2.getKey(), true)
.build();

// Create another repo with shallow snapshot V2 enabled, this should fail.
try {
createRepository(client, "test-repo-2", snapshotRepoSettings2);
} catch (RepositoryException e) {
assertEquals(
"[test-repo-2] setting shallow_snapshot_v2 cannot be enabled as this setting can be enabled only on one repository and one or more repositories in the cluster have the setting as enabled",
e.getMessage()
);
}

// Disable shallow snapshot V2 setting on test-repo-1
updateRepository(
client,
"test-repo-1",
Settings.builder().put(snapshotRepoSettings1).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build()
);
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));

// Create test-repo-2 with shallow snapshot V2 enabled, this should pass now.
createRepository(client, "test-repo-2", snapshotRepoSettings2);
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-2").get();
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));

final String indexName = "test-idx";
createIndex(indexName);
ensureGreen();
indexDocuments(client, indexName);

// Create pinned timestamp snapshot in test-repo-2
SnapshotInfo snapshotInfo = createSnapshot("test-repo-2", "test-snap-2", new ArrayList<>());
assertNotNull(snapshotInfo.snapshotId());

// As snapshot is present, even after disabling shallow snapshot setting in test-repo-2, we will not be able to
// enable shallow snapshot v2 setting in test-repo-1
updateRepository(
client,
"test-repo-2",
Settings.builder().put(snapshotRepoSettings2).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build()
);
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-2").get();
assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));

try {
updateRepository(client, "test-repo-1", snapshotRepoSettings1);
} catch (RepositoryException e) {
assertEquals(
"[test-repo-1] setting shallow_snapshot_v2 cannot be enabled if there are existing snapshots created with shallow V2 setting using different repository.",
e.getMessage()
);
}

// After deleting the snapshot, we will be able to enable shallow snapshot v2 setting in test-repo-1
AcknowledgedResponse deleteSnapshotResponse = client().admin().cluster().prepareDeleteSnapshot("test-repo-2", "test-snap-2").get();

assertAcked(deleteSnapshotResponse);

updateRepository(client, "test-repo-1", snapshotRepoSettings1);
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));

// Having a snapshot in the same repo should allow disabling and re-enabling shallow snapshot v2 setting
snapshotInfo = createSnapshot("test-repo-1", "test-snap-1", new ArrayList<>());
assertNotNull(snapshotInfo.snapshotId());
updateRepository(
client,
"test-repo-1",
Settings.builder().put(snapshotRepoSettings1).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build()
);
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));

updateRepository(client, "test-repo-1", snapshotRepoSettings1);
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
}
}

0 comments on commit 74ae2bc

Please sign in to comment.