Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Apr 1, 2024
1 parent 3907ec9 commit 86ed7cb
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase {
public void testLocalRecoveryRollingRestart() throws Exception {
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate("idx1").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("idx1");

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
refresh("idx1");

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// add remote node in mixed mode cluster
addRemote = true;
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// rolling restart
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback());
ensureStableCluster(2);
ensureGreen("idx1");
assertEquals(internalCluster().size(), 2);

// Index some more docs
int currentDoc = numAutoGenDocs.get();
int finalCurrentDoc = currentDoc;
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc + 5);

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("idx1", 0, docRepNode, remoteNode)).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertEquals(0, clusterHealthResponse.getRelocatingShards());
assertEquals(remoteNode, primaryNodeName("idx1"));

OpenSearchAssertions.assertHitCount(client().prepareSearch("idx1").setTrackTotalHits(true).get(), numAutoGenDocs.get());
OpenSearchAssertions.assertHitCount(
client().prepareSearch("idx1").setTrackTotalHits(true).setQuery(QueryBuilders.termQuery("auto", true)).get(),
numAutoGenDocs.get()
);
}

private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 10_000) {
IndexResponse indexResponse = client().prepareIndex("idx1").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("idx1", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("idx1").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
}
});
indexingThread.start();
return indexingThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.common.blobstore;

import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -44,6 +45,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "2.14.0")
public class BlobPath implements Iterable<String> {

private static final String SEPARATOR = "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.remote;

import org.opensearch.common.annotation.PublicApi;

import java.util.Set;

import static org.opensearch.index.remote.RemoteStoreDataEnums.DataType.DATA;
Expand All @@ -23,6 +25,7 @@ public class RemoteStoreDataEnums {
/**
* Categories of the data in Remote store.
*/
@PublicApi(since = "2.14.0")
public enum DataCategory {
SEGMENTS("segments", Set.of(DataType.values())),
TRANSLOG("translog", Set.of(DATA, METADATA));
Expand All @@ -47,6 +50,7 @@ public String getName() {
/**
* Types of data in remote store.
*/
@PublicApi(since = "2.14.0")
public enum DataType {
DATA("data"),
METADATA("metadata"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.remote;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.remote.RemoteStoreDataEnums.DataCategory;
import org.opensearch.index.remote.RemoteStoreDataEnums.DataType;
Expand All @@ -20,6 +21,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "2.14.0")
public enum RemoteStorePathType {

FIXED {
Expand Down
34 changes: 21 additions & 13 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,18 @@
import org.opensearch.index.search.stats.ShardSearchStats;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import org.opensearch.index.seqno.RetentionLeaseStats;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_SEEDED;
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_UNSEEDED;
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_NON_MIGRATING;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
Expand All @@ -171,6 +178,8 @@
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.index.translog.Translog;
import static org.opensearch.index.translog.Translog.Durability;
import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.index.translog.TranslogRecoveryRunner;
Expand Down Expand Up @@ -230,16 +239,6 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_SEEDED;
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_UNSEEDED;
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_NON_MIGRATING;
import static org.opensearch.index.translog.Translog.Durability;
import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY;

/**
* An OpenSearch index shard
*
Expand Down Expand Up @@ -2522,7 +2521,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
syncSegmentsFromRemoteSegmentStore(false);
}
if (shardRouting.primary()) {
if (syncFromRemote) {
if (syncFromRemote || this.isRemoteSeeded()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
// we will enter this block when we do not want to recover from remote translog.
Expand Down Expand Up @@ -3670,7 +3669,7 @@ public void startRecovery(
RepositoriesService repositoriesService,
Consumer<MappingMetadata> mappingUpdateConsumer,
IndicesService indicesService
) {
) throws IOException {
// TODO: Create a proper object to encapsulate the recovery context
// all of the current methods here follow a pattern of:
// resolve context which isn't really dependent on the local shards and then async
Expand All @@ -3692,7 +3691,16 @@ public void startRecovery(
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE:
case EXISTING_STORE:
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
if (shouldSeedRemoteStore() && routingEntry().primary()) {
deleteRemoteStoreContents();
// This cleans up remote translog's 0 generation, as we don't want to get that uploaded
sync();
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { refresh("local recovery during remote store migration"); });
waitForRemoteStoreSync();
logger.info("Remote Store is now seeded for {} after local recovery", shardId());
} else {
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
}
break;
case REMOTE_STORE:
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private final boolean shouldSeedRemote;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
boolean shouldSeedRemote
) {
Repository repository;
try {
Expand All @@ -49,6 +52,7 @@ public RemoteBlobStoreInternalTranslogFactory(
this.repository = repository;
this.threadPool = threadPool;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.shouldSeedRemote = shouldSeedRemote;
}

@Override
Expand All @@ -74,7 +78,8 @@ public Translog newTranslog(
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker
remoteTranslogTransferTracker,
shouldSeedRemote
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class RemoteFsTranslog extends Translog {
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);
private final AtomicBoolean pauseSync = new AtomicBoolean(false);

private final boolean shouldSeedRemote;

public RemoteFsTranslog(
TranslogConfig config,
String translogUUID,
Expand All @@ -100,7 +102,8 @@ public RemoteFsTranslog(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
boolean shouldSeedRemote
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
Expand All @@ -115,33 +118,38 @@ public RemoteFsTranslog(
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathType()
);
this.shouldSeedRemote = shouldSeedRemote;
try {
download(translogTransferManager, location, logger);
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
String errorMsg = String.format(Locale.ROOT, "%s at least one reader must be recovered", shardId);
logger.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
boolean success = false;
current = null;
try {
current = createWriter(
checkpoint.generation + 1,
getMinFileGeneration(),
checkpoint.globalCheckpoint,
persistedSequenceNumberConsumer
);
success = true;
} finally {
// we have to close all the recovered ones otherwise we leak file handles here
// for instance if we have a lot of tlog and we can't create the writer we keep
// on holding
// on to all the uncommitted tlog files if we don't close
if (success == false) {
IOUtils.closeWhileHandlingException(readers);
if (shouldSeedRemote) {
sync();
} else {
download(translogTransferManager, location, logger);
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
String errorMsg = String.format(Locale.ROOT, "%s at least one reader must be recovered", shardId);
logger.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
boolean success = false;
current = null;
try {
current = createWriter(
checkpoint.generation + 1,
getMinFileGeneration(),
checkpoint.globalCheckpoint,
persistedSequenceNumberConsumer
);
success = true;
} finally {
// we have to close all the recovered ones otherwise we leak file handles here
// for instance if we have a lot of tlog and we can't create the writer we keep
// on holding
// on to all the uncommitted tlog files if we don't close
if (success == false) {
IOUtils.closeWhileHandlingException(readers);
}
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -386,7 +394,7 @@ private boolean syncToDisk() throws IOException {

@Override
public void sync() throws IOException {
if (syncToDisk() || syncNeeded()) {
if (syncToDisk() || syncNeeded() || shouldSeedRemote) {
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ TranslogWriter createWriter(
tragedy,
persistedSequenceNumberConsumer,
bigArrays,
indexSettings.isRemoteTranslogStoreEnabled(),
indexSettings.isRemoteNode()
);
} catch (final IOException e) {
Expand Down Expand Up @@ -2043,6 +2044,7 @@ public static String createEmptyTranslog(
throw new UnsupportedOperationException();
},
BigArrays.NON_RECYCLING_INSTANCE,
null,
null
);
writer.close();
Expand Down
Loading

0 comments on commit 86ed7cb

Please sign in to comment.