Skip to content

Commit

Permalink
Adjust translog operation assertions for synthetic source
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Dec 28, 2024
1 parent d008ada commit 50240da
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 74 deletions.
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,6 @@ tests:
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
method: testFailureLoadingFields
issue: https://github.com/elastic/elasticsearch/issues/118000
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source}
issue: https://github.com/elastic/elasticsearch/issues/119191
- class: org.elasticsearch.index.mapper.AbstractShapeGeometryFieldMapperTests
method: testCartesianBoundsBlockLoader
issue: https://github.com/elastic/elasticsearch/issues/119201
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,8 @@ private Translog openTranslog(
translogDeletionPolicy,
globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(),
persistedSequenceNumberConsumer
persistedSequenceNumberConsumer,
TranslogOperationAsserter.withEngineConfig(engineConfig)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public void trimUnreferencedTranslogFiles() {
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
TranslogOperationAsserter.DEFAULT
)
) {
translog.trimUnreferencedReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
TranslogOperationAsserter.DEFAULT
)
) {
return translog.stats();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.index.mapper.DocumentParser;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.Objects;

/**
*
* A utility class to assert that translog operations with the same sequence number
* in the same generation are either identical or equivalent when synthetic sources are used.
*/
public abstract class TranslogOperationAsserter {
public static final TranslogOperationAsserter DEFAULT = new TranslogOperationAsserter() {
};

private TranslogOperationAsserter() {

}

public static TranslogOperationAsserter withEngineConfig(EngineConfig engineConfig) {
return new TranslogOperationAsserter() {
Translog.Index synthesizeSource(Translog.Index op) throws IOException {
final ShardId shardId = engineConfig.getShardId();
MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup();
DocumentParser documentParser = engineConfig.getMapperService().documentParser();
try (var reader = new TranslogDirectoryReader(shardId, op, mappingLookup, documentParser, engineConfig, () -> {})) {
final Engine.Searcher searcher = new Engine.Searcher(
"assert_translog",
reader,
engineConfig.getSimilarity(),
engineConfig.getQueryCache(),
engineConfig.getQueryCachingPolicy(),
() -> {}
);
try (
LuceneSyntheticSourceChangesSnapshot snapshot = new LuceneSyntheticSourceChangesSnapshot(
mappingLookup,
searcher,
LuceneSyntheticSourceChangesSnapshot.DEFAULT_BATCH_SIZE,
Integer.MAX_VALUE,
op.seqNo(),
op.seqNo(),
true,
false,
engineConfig.getIndexSettings().getIndexVersionCreated()
)
) {
final Translog.Operation normalized = snapshot.next();
assert normalized != null : "expected one operation; got zero";
return (Translog.Index) normalized;
}
}
}

@Override
boolean sameIndexOperation(Translog.Index o1, Translog.Index o2) throws IOException {
return super.sameIndexOperation(o1, o2)
|| super.sameIndexOperation(synthesizeSource(o1), o2)
|| super.sameIndexOperation(o1, synthesizeSource(o2));
}
};
}

boolean sameIndexOperation(Translog.Index o1, Translog.Index o2) throws IOException {
// TODO: We haven't had timestamp for Index operations in Lucene yet, we need to loosen this check without timestamp.
return Objects.equals(o1.id(), o2.id())
&& Objects.equals(o1.source(), o2.source())
&& Objects.equals(o1.routing(), o2.routing())
&& o1.primaryTerm() == o2.primaryTerm()
&& o1.seqNo() == o2.seqNo()
&& o1.version() == o2.version();
}

public boolean assertSameOperations(
long seqNo,
long generation,
Translog.Operation newOp,
Translog.Operation prvOp,
Exception prevFailure
) throws IOException {
final boolean sameOp;
if (newOp instanceof final Translog.Index o2 && prvOp instanceof final Translog.Index o1) {
sameOp = sameIndexOperation(o1, o2);
} else if (newOp instanceof final Translog.Delete o1 && prvOp instanceof final Translog.Delete o2) {
sameOp = Objects.equals(o1.id(), o2.id())
&& o1.primaryTerm() == o2.primaryTerm()
&& o1.seqNo() == o2.seqNo()
&& o1.version() == o2.version();
} else {
sameOp = false;
}
if (sameOp == false) {
throw new AssertionError(
"seqNo ["
+ seqNo
+ "] was processed twice in generation ["
+ generation
+ "], with different data. "
+ "prvOp ["
+ prvOp
+ "], newOp ["
+ newOp
+ "]",
prevFailure
);
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.TranslogOperationAsserter;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final TranslogDeletionPolicy deletionPolicy;
private final LongConsumer persistedSequenceNumberConsumer;
private final OperationListener operationListener;
private final TranslogOperationAsserter operationAsserter;

/**
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
Expand Down Expand Up @@ -150,14 +152,16 @@ public Translog(
TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer
final LongConsumer persistedSequenceNumberConsumer,
final TranslogOperationAsserter operationAsserter
) throws IOException {
super(config.getShardId(), config.getIndexSettings());
this.config = config;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
this.operationListener = config.getOperationListener();
this.operationAsserter = operationAsserter;
this.deletionPolicy = deletionPolicy;
this.translogUUID = translogUUID;
this.bigArrays = config.getBigArrays();
Expand Down Expand Up @@ -586,6 +590,7 @@ TranslogWriter createWriter(
bigArrays,
diskIoBufferPool,
operationListener,
operationAsserter,
config.fsync()
);
} catch (final IOException e) {
Expand Down Expand Up @@ -1962,6 +1967,7 @@ public static String createEmptyTranslog(
BigArrays.NON_RECYCLING_INSTANCE,
DiskIoBufferPool.INSTANCE,
TranslogConfig.NOOP_OPERATION_LISTENER,
TranslogOperationAsserter.DEFAULT,
true
);
writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.engine.TranslogOperationAsserter;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;

Expand All @@ -39,7 +40,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -69,6 +69,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// callback that's called whenever an operation with a given sequence number is successfully persisted.
private final LongConsumer persistedSequenceNumberConsumer;
private final OperationListener operationListener;
private final TranslogOperationAsserter operationAsserter;
private final boolean fsync;

protected final AtomicBoolean closed = new AtomicBoolean(false);
Expand Down Expand Up @@ -108,6 +109,7 @@ private TranslogWriter(
BigArrays bigArrays,
DiskIoBufferPool diskIoBufferPool,
OperationListener operationListener,
TranslogOperationAsserter operationAsserter,
boolean fsync
) throws IOException {
super(initialCheckpoint.generation, channel, path, header);
Expand Down Expand Up @@ -136,6 +138,7 @@ private TranslogWriter(
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
this.tragedy = tragedy;
this.operationListener = operationListener;
this.operationAsserter = operationAsserter;
this.fsync = fsync;
this.lastModifiedTimeCache = new LastModifiedTimeCache(-1, -1, -1);
}
Expand All @@ -157,6 +160,7 @@ public static TranslogWriter create(
BigArrays bigArrays,
DiskIoBufferPool diskIoBufferPool,
OperationListener operationListener,
TranslogOperationAsserter operationAsserter,
boolean fsync
) throws IOException {
final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME);
Expand Down Expand Up @@ -201,6 +205,7 @@ public static TranslogWriter create(
bigArrays,
diskIoBufferPool,
operationListener,
operationAsserter,
fsync
);
} catch (Exception exception) {
Expand Down Expand Up @@ -276,38 +281,7 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc
Translog.Operation prvOp = Translog.readOperation(
new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion")
);
// TODO: We haven't had timestamp for Index operations in Lucene yet, we need to loosen this check without timestamp.
final boolean sameOp;
if (newOp instanceof final Translog.Index o2 && prvOp instanceof final Translog.Index o1) {
sameOp = Objects.equals(o1.id(), o2.id())
&& Objects.equals(o1.source(), o2.source())
&& Objects.equals(o1.routing(), o2.routing())
&& o1.primaryTerm() == o2.primaryTerm()
&& o1.seqNo() == o2.seqNo()
&& o1.version() == o2.version();
} else if (newOp instanceof final Translog.Delete o1 && prvOp instanceof final Translog.Delete o2) {
sameOp = Objects.equals(o1.id(), o2.id())
&& o1.primaryTerm() == o2.primaryTerm()
&& o1.seqNo() == o2.seqNo()
&& o1.version() == o2.version();
} else {
sameOp = false;
}
if (sameOp == false) {
throw new AssertionError(
"seqNo ["
+ seqNo
+ "] was processed twice in generation ["
+ generation
+ "], with different data. "
+ "prvOp ["
+ prvOp
+ "], newOp ["
+ newOp
+ "]",
previous.v2()
);
}
assert operationAsserter.assertSameOperations(seqNo, generation, newOp, prvOp, previous.v2());
}
} else {
seenSequenceNumbers.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.TranslogOperationAsserter;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommand;
import org.elasticsearch.index.shard.ShardPath;
Expand Down Expand Up @@ -171,7 +172,8 @@ private static boolean isTranslogClean(ShardPath shardPath, ClusterState cluster
translogDeletionPolicy,
() -> translogGlobalCheckpoint,
() -> primaryTerm,
seqNo -> {}
seqNo -> {},
TranslogOperationAsserter.DEFAULT
);
Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3568,7 +3568,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
new TranslogDeletionPolicy(),
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
seqNo -> {}
seqNo -> {},
TranslogOperationAsserter.DEFAULT
);
translog.add(TranslogOperationsUtils.indexOp("SomeBogusId", 0, primaryTerm.get()));
assertEquals(generation.translogFileGeneration(), translog.currentFileGeneration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.engine.TranslogOperationAsserter;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.mockito.Mockito;
Expand Down Expand Up @@ -95,6 +96,7 @@ private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter() thr
BigArrays.NON_RECYCLING_INSTANCE,
TranslogTests.RANDOMIZING_IO_BUFFERS,
TranslogConfig.NOOP_OPERATION_LISTENER,
TranslogOperationAsserter.DEFAULT,
true
);
writer = Mockito.spy(writer);
Expand Down
Loading

0 comments on commit 50240da

Please sign in to comment.