Skip to content

Commit

Permalink
added the unit tests for incremental download of translog
Browse files Browse the repository at this point in the history
This commit adds the unit tests applicable for the changes made to support incremental download of translog files. Primarily, the unit tests cover the changes in-
- TranslogFooter to write and read the footer of Translog
- RemoteFSTranslog to skip the download of locally present translog
- TranslogWriter to create reader with checksum upon close
- TranslogReader closeIntoReader functionality

Signed-off-by: Harsh Rawat <[email protected]>
  • Loading branch information
Harsh Rawat committed Oct 9, 2024
1 parent 83e81c3 commit ce18c9b
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.DataOutput;
Expand All @@ -19,12 +20,14 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.bytes.ReleasableBytesReference;
import org.opensearch.common.io.Channels;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -75,6 +78,7 @@
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -110,6 +114,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")
Expand Down Expand Up @@ -1679,6 +1685,24 @@ public void testCloseIntoReader() throws IOException {
final int value = buffer.getInt();
assertEquals(i, value);
}

// Try to read into the footer which would lead into EOF exception.
assertThrowsReadingIntoFooter(reader, numOps, 4);
// Try to read beyond the footer which would lead into EOF exception.
assertThrowsReadingIntoFooter(reader, numOps, 18);

// Read next 16 bytes directly from the file, which should be the footer.
// This is because for this test, we create a writer which would automatically
// create one with footer.
long translogLengthWithoutFooter = reader.length - TranslogFooter.footerLength();
ByteBuffer footerBuffer = ByteBuffer.allocate(TranslogFooter.footerLength());
Channels.readFromFileChannelWithEofException(reader.channel, translogLengthWithoutFooter, footerBuffer);
footerBuffer.flip();
// Validate the footer.
assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt());
assertEquals(0, footerBuffer.getInt()); // Algorithm ID
assertEquals(reader.getTranslogChecksum().longValue(), footerBuffer.getLong());

final Checkpoint readerCheckpoint = reader.getCheckpoint();
assertThat(readerCheckpoint, equalTo(writerCheckpoint));
} finally {
Expand All @@ -1687,6 +1711,12 @@ public void testCloseIntoReader() throws IOException {
}
}

// assertThrowsReadingIntoFooter asserts EOF error when we try reading into the Translog footer via reader.
private void assertThrowsReadingIntoFooter(TranslogReader reader, int numOps, int bytesToRead) {
final ByteBuffer buffer = ByteBuffer.allocate(bytesToRead);
assertThrows(EOFException.class, () -> reader.readBytes(buffer, reader.getFirstOperationOffset() + numOps * 4));
}

public void testDownloadWithRetries() throws IOException {
long generation = 1, primaryTerm = 1;
Path location = createTempDir();
Expand Down Expand Up @@ -1805,6 +1835,110 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
}

/**
* createTranslogFile creates a translog file with the given generation and checksum at the provided location.
* */
private void createTranslogFile(Path location, long generation, long checksum) throws IOException {
Path translogPath = location.resolve(Translog.getFilename(generation));
Path checkpointPath = location.resolve(Translog.getCommitCheckpointFileName(generation));
Files.createFile(translogPath);
Files.createFile(checkpointPath);
try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) {
// Write a translog header
TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), generation);
header.write(channel, true);

// Write some translog operations
byte[] operationBytes = new byte[] { 1, 2, 3, 4 };
channel.write(ByteBuffer.wrap(operationBytes));

// Write the translog footer
TranslogFooter.write(channel, checksum, true);
}
}

/**
* testIncrementalDownloadWithMatchingChecksum tests the scenario where we have the translog
* file present locally. We test if the download logic for the same skips the download of the file.
* */
public void testIncrementalDownloadWithMatchingChecksum() throws IOException {
// Set up the test scenario
long generation = 1;
long primaryTerm = 1;
long checksum = 1234;
Path location = createTempDir();
TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1);
Map<String, String> generationToPrimaryTermMapper = new HashMap<>();
Map<String, String> generationToChecksumMapper = new HashMap<>();
generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm));
generationToChecksumMapper.put(String.valueOf(generation), String.valueOf(checksum));
translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);
translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper);

// Mock the transfer manager.
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

// Create a local translog file with the same checksum as the remote
createTranslogFile(location, generation, checksum);

// Verify that the download is skipped
RemoteFsTranslog.download(mockTransfer, location, logger, false, 0);
verify(mockTransfer, times(0)).downloadTranslog(any(), any(), any());
}

/**
* testIncrementalDownloadWithDifferentChecksum tests the case where we have 2 translog generations
* in remote but only 1 present locally. We will download only 1 generation in this case.
* */
public void testIncrementalDownloadWithDifferentChecksum() throws IOException {
// Set up the test scenario
long generation1 = 1, generation2 = 2, primaryTerm = 1;
long checksum1 = 1234, checksum2 = 5678;
Path location = createTempDir();

TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation2, generation1, 2);
Map<String, String> generationToPrimaryTermMapper = Map.of(
String.valueOf(generation1),
String.valueOf(primaryTerm),
String.valueOf(generation2),
String.valueOf(primaryTerm)
);
Map<String, String> generationToChecksumMapper = Map.of(
String.valueOf(generation1),
String.valueOf(checksum1),
String.valueOf(generation2),
String.valueOf(checksum2)
);
translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);
translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper);

// Mock the transfer manager.
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

// Create a local translog file for 1 generation.
createTranslogFile(location, generation1, checksum1);
// Download counter to count the files which were downloaded.
AtomicLong downloadCounter = new AtomicLong();
// Mock the download of second generation.
doAnswer(invocation -> {
downloadCounter.incrementAndGet();
Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation2)));
return true;
}).when(mockTransfer).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation2), location);

// Verify that only generation 2 is downloaded.
RemoteFsTranslog.download(mockTransfer, location, logger, false, 0);
assertEquals(1, downloadCounter.get());
// verify that generation 1 is not downloaded.
verify(mockTransfer, times(0)).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation1), location);
}

public void testSyncWithGlobalCheckpointUpdate() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.index.translog;

import org.apache.lucene.codecs.CodecUtil;
import org.opensearch.common.UUIDs;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

public class TranslogFooterTests extends OpenSearchTestCase {

/**
* testTranslogFooterWrite verifies the functionality of TranslogFooter.write() method
* wherein we write the footer to the translog file.
* */
public void testTranslogFooterWrite() throws IOException {
Path translogPath = createTempFile();
try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) {
// Write a translog header
TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong());
header.write(channel, true);

// Write some translog operations
byte[] operationBytes = new byte[] { 1, 2, 3, 4 };
channel.write(ByteBuffer.wrap(operationBytes));

// Write the translog footer
long expectedChecksum = 0x1234567890ABCDEFL;
byte[] footer = TranslogFooter.write(channel, expectedChecksum, true);

// Verify the footer contents
ByteBuffer footerBuffer = ByteBuffer.wrap(footer);
assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt());
assertEquals(0, footerBuffer.getInt());
assertEquals(expectedChecksum, footerBuffer.getLong());

// Verify that the footer was written to the channel
assertEquals(footer.length, channel.size() - (header.sizeInBytes() + operationBytes.length));
}
}

/**
* testTranslogFooterReadChecksum verifies the behavior of the TranslogFooter.readChecksum() method,
* which reads the checksum from the footer of a translog file.
* */
public void testTranslogFooterReadChecksum() throws IOException {
long expectedChecksum = 0x1234567890ABCDEFL;
Path translogPath = createTempFile();
try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) {
// Write a translog header
TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong());
header.write(channel, true);

// Write some translog operations
byte[] operationBytes = new byte[] { 1, 2, 3, 4 };
channel.write(ByteBuffer.wrap(operationBytes));

// Write the translog footer.
TranslogFooter.write(channel, expectedChecksum, true);
}

// Verify that the checksum can be read correctly
Long actualChecksum = TranslogFooter.readChecksum(translogPath);
assert actualChecksum != null;
assertEquals(expectedChecksum, actualChecksum.longValue());
}
}
Loading

0 comments on commit ce18c9b

Please sign in to comment.