Skip to content

Commit

Permalink
handle unexpected exception on success callback of translog upload (o…
Browse files Browse the repository at this point in the history
…pensearch-project#12577) (opensearch-project#12684)

* handle unexpected exception on success callback of translog upload


(cherry picked from commit 4a0feee)

Signed-off-by: Varun Bansal <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 204f354 commit 88f2891
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.translog.transfer;

import org.apache.logging.log4j.Logger;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand All @@ -33,11 +35,13 @@ public class FileTransferTracker implements FileTransferListener {
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
private Map<String, Long> bytesForTlogCkpFileToUpload;
private long fileTransferStartTime = -1;
private final Logger logger;

public FileTransferTracker(ShardId shardId, RemoteTranslogTransferTracker remoteTranslogTransferTracker) {
this.shardId = shardId;
this.fileTransferTracker = new ConcurrentHashMap<>();
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.logger = Loggers.getLogger(getClass(), shardId);
}

void recordFileTransferStartTime(long uploadStartTime) {
Expand All @@ -64,9 +68,14 @@ long getTotalBytesToUpload() {

@Override
public void onSuccess(TransferFileSnapshot fileSnapshot) {
long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L;
remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis);
remoteTranslogTransferTracker.addUploadBytesSucceeded(bytesForTlogCkpFileToUpload.get(fileSnapshot.getName()));
try {
long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L;
remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis);
remoteTranslogTransferTracker.addUploadBytesSucceeded(bytesForTlogCkpFileToUpload.get(fileSnapshot.getName()));
} catch (Exception ex) {
logger.error("Failure to update translog upload success stats", ex);
}

add(fileSnapshot.getName(), TransferState.SUCCESS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import java.util.List;
import java.util.Set;

import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

public class FileTransferTrackerTests extends OpenSearchTestCase {

protected final ShardId shardId = new ShardId("index", "_na_", 1);
Expand Down Expand Up @@ -94,6 +98,32 @@ public void testOnFailure() throws IOException {
}
}

public void testOnSuccessStatsFailure() throws IOException {
RemoteTranslogTransferTracker localRemoteTranslogTransferTracker = spy(remoteTranslogTransferTracker);
doAnswer((count) -> { throw new NullPointerException("Error while updating stats"); }).when(localRemoteTranslogTransferTracker)
.addUploadBytesSucceeded(anyLong());

FileTransferTracker localFileTransferTracker = new FileTransferTracker(shardId, localRemoteTranslogTransferTracker);

Path testFile = createTempFile();
int fileSize = 128;
Files.write(testFile, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND);
try (
FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(
testFile,
randomNonNegativeLong(),
null
);
) {
Set<FileSnapshot.TransferFileSnapshot> toUpload = new HashSet<>(2);
toUpload.add(transferFileSnapshot);
localFileTransferTracker.recordBytesForFiles(toUpload);
localRemoteTranslogTransferTracker.addUploadBytesStarted(fileSize);
localFileTransferTracker.onSuccess(transferFileSnapshot);
assertEquals(localFileTransferTracker.allUploaded().size(), 1);
}
}

public void testUploaded() throws IOException {
Path testFile = createTempFile();
int fileSize = 128;
Expand Down

0 comments on commit 88f2891

Please sign in to comment.