Skip to content

Commit

Permalink
Changes to support retrieval of operations from translog based on spe…
Browse files Browse the repository at this point in the history
…cified range (#1257)

Backport changes to support retrieval of operations from translog based on specified range

Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon authored Sep 26, 2021
1 parent eba365c commit 15e9f13
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 5 deletions.
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,15 @@ public enum SearcherScope {
public abstract Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException;

/**
* Creates a new history snapshot from either Lucene/Translog for reading operations whose seqno in the requesting
* seqno range (both inclusive).
*/
public Translog.Snapshot newChangesSnapshot(String source, HistorySource historySource, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
}

/**
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
* The returned snapshot can be retrieved from either Lucene index or translog files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2702,6 +2702,17 @@ private void ensureSoftDeletesEnabled() {
}
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, HistorySource historySource, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
if(historySource == HistorySource.INDEX) {
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange);
} else {
return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange);
}
}


@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*/
public final class MissingHistoryOperationsException extends IllegalStateException {

MissingHistoryOperationsException(String message) {
public MissingHistoryOperationsException(String message) {
super(message);
}
}
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,17 @@ public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySourc
return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
}

/**
*
* Creates a new history snapshot for reading operations since
* the provided starting seqno (inclusive) and ending seqno (inclusive)
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source,
long startingSeqNo, long endSeqNo) throws IOException {
return getEngine().newChangesSnapshot(reason, source, mapperService, startingSeqNo, endSeqNo, true);
}

/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)}
Expand Down
21 changes: 17 additions & 4 deletions server/src/main/java/org/opensearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.MissingHistoryOperationsException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShardComponent;
Expand Down Expand Up @@ -617,14 +618,18 @@ public Snapshot newSnapshot() throws IOException {
return newSnapshot(0, Long.MAX_VALUE);
}

public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
return newSnapshot(fromSeqNo, toSeqNo, false);
}

/**
* Creates a new translog snapshot containing operations from the given range.
*
* @param fromSeqNo the lower bound of the range (inclusive)
* @param toSeqNo the upper bound of the range (inclusive)
* @return the new snapshot
*/
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo;
assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo;
try (ReleasableLock ignored = readLock.acquire()) {
Expand All @@ -633,7 +638,7 @@ public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
.filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo())
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
final Snapshot snapshot = newMultiSnapshot(snapshots);
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo);
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo, requiredFullRange);
}
}

Expand Down Expand Up @@ -959,14 +964,17 @@ default int skippedOperations() {
private static final class SeqNoFilterSnapshot implements Snapshot {
private final Snapshot delegate;
private int filteredOpsCount;
private int opsCount;
private boolean requiredFullRange;
private final long fromSeqNo; // inclusive
private final long toSeqNo; // inclusive

SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
this.delegate = delegate;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
this.requiredFullRange = requiredFullRange;
}

@Override
Expand All @@ -980,15 +988,20 @@ public int skippedOperations() {
}

@Override
public Operation next() throws IOException {
public Operation next() throws IOException, MissingHistoryOperationsException {
Translog.Operation op;
while ((op = delegate.next()) != null) {
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
opsCount++;
return op;
} else {
filteredOpsCount++;
}
}
if(requiredFullRange && (toSeqNo - fromSeqNo +1) != opsCount) {
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found");
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5417,6 +5417,48 @@ public void testTrimUnsafeCommits() throws Exception {
}
}

public void testHistoryBasedOnSource() throws Exception {
final List<Engine.Operation> operations = generateSingleDocHistory(false,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1");
final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETES_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy());
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
Set<Long> expectedSeqNos = new HashSet<>();
try (Store store = createStore();
Engine engine = createEngine(config(indexSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) {
for (Engine.Operation op : operations) {
if (op instanceof Engine.Index) {
Engine.IndexResult indexResult = engine.index((Engine.Index) op);
assertThat(indexResult.getFailure(), nullValue());
expectedSeqNos.add(indexResult.getSeqNo());
} else {
Engine.DeleteResult deleteResult = engine.delete((Engine.Delete) op);
assertThat(deleteResult.getFailure(), nullValue());
expectedSeqNos.add(deleteResult.getSeqNo());
}
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.flush();
}
if (rarely()) {
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
}
}
MapperService mapperService = createMapperService("test");
List<Translog.Operation> luceneOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.INDEX, mapperService);
List<Translog.Operation> translogOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.TRANSLOG, mapperService);
assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
assertThat(translogOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
}
}

public void testLuceneHistoryOnPrimary() throws Exception {
final List<Engine.Operation> operations = generateSingleDocHistory(false,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.Engine.Operation.Origin;
import org.opensearch.index.engine.MissingHistoryOperationsException;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.ParseContext.Document;
import org.opensearch.index.mapper.ParsedDocument;
Expand Down Expand Up @@ -120,6 +121,7 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -737,6 +739,72 @@ public void testRangeSnapshot() throws Exception {
}
}

private Long populateTranslogOps(boolean withMissingOps) throws IOException {
long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final int generations = between(2, 20);
long currentSeqNo = 0L;
List<Translog.Operation> firstGenOps = null;
Map<Long, List<Translog.Operation>> operationsByGen = new HashMap<>();
for (int gen = 0; gen < generations; gen++) {
List<Long> seqNos = new ArrayList<>();
int numOps = randomIntBetween(4, 10);
for (int i = 0; i < numOps; i++, currentSeqNo++) {
minSeqNo = SequenceNumbers.min(minSeqNo, currentSeqNo);
maxSeqNo = SequenceNumbers.max(maxSeqNo, currentSeqNo);
seqNos.add(currentSeqNo);
}
Collections.shuffle(seqNos, new Random(100));
List<Translog.Operation> ops = new ArrayList<>(seqNos.size());
for (long seqNo : seqNos) {
Translog.Index op = new Translog.Index("_doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{randomByte()});
boolean shouldAdd = !withMissingOps || seqNo % 4 != 0;
if(shouldAdd) {
translog.add(op);
ops.add(op);
}
}
operationsByGen.put(translog.currentFileGeneration(), ops);
if(firstGenOps == null) {
firstGenOps = ops;
}
translog.rollGeneration();
if (rarely()) {
translog.rollGeneration(); // empty generation
}
}
return currentSeqNo;
}

public void testFullRangeSnapshot() throws Exception {
// Successful snapshot
long nextSeqNo = populateTranslogOps(false);
long fromSeqNo = 0L;
long toSeqNo = Math.min(nextSeqNo - 1, fromSeqNo + 15);
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) {
int totOps = 0;
for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) {
totOps++;
}
assertEquals(totOps, toSeqNo - fromSeqNo + 1);
}
}

public void testFullRangeSnapshotWithFailures() throws Exception {
long nextSeqNo = populateTranslogOps(true);
long fromSeqNo = 0L;
long toSeqNo = Math.min(nextSeqNo-1, fromSeqNo + 15);
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) {
int totOps = 0;
for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) {
totOps++;
}
fail("Should throw exception for missing operations");
} catch(MissingHistoryOperationsException e) {
assertTrue(e.getMessage().contains("Not all operations between from_seqno"));
}
}

public void assertFileIsPresent(Translog translog, long id) {
if (Files.exists(translog.location().resolve(Translog.getFilename(id)))) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,22 @@ public static List<Translog.Operation> readAllOperationsInLucene(Engine engine,
return operations;
}

/**
* Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source.
*/
public static List<Translog.Operation> readAllOperationsBasedOnSource(Engine engine, Engine.HistorySource historySource,
MapperService mapper) throws IOException {
final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", historySource, mapper,
0, Long.MAX_VALUE, false)) {
Translog.Operation op;
while ((op = snapshot.next()) != null){
operations.add(op);
}
}
return operations;
}

/**
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/
Expand Down

0 comments on commit 15e9f13

Please sign in to comment.