Skip to content

Commit

Permalink
Process translog.tlog and translog.ckp files Snapshot as one for remo…
Browse files Browse the repository at this point in the history
…te upload

Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed Apr 17, 2024
1 parent c56cf29 commit 84f9c92
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
Expand All @@ -32,11 +33,15 @@
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;

Expand All @@ -49,12 +54,35 @@ public class BlobStoreTransferService implements TransferService {

private final BlobStore blobStore;
private final ThreadPool threadPool;
private ConcurrentHashMap<String, FileTransferTracker.TransferState> fileTransferTrackerCache;

private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class);

public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) {
this.blobStore = blobStore;
this.threadPool = threadPool;
fileTransferTrackerCache = new ConcurrentHashMap<>();
}

@Override
public ConcurrentHashMap<String, FileTransferTracker.TransferState> getFileTransferTrackerCache() {
return fileTransferTrackerCache;
}

void add(String file, boolean success) {
FileTransferTracker.TransferState targetState = success
? FileTransferTracker.TransferState.SUCCESS
: FileTransferTracker.TransferState.FAILED;
add(file, targetState);
}

private void add(String file, FileTransferTracker.TransferState targetState) {
fileTransferTrackerCache.compute(file, (k, v) -> {
if (v == null || v.validateNextState(targetState)) {
return targetState;
}
throw new IllegalStateException("Unexpected transfer state " + v + "while setting target to" + targetState);
});
}

@Override
Expand Down Expand Up @@ -99,33 +127,105 @@ public void uploadBlobs(
ActionListener<TransferFileSnapshot> listener,
WritePriority writePriority
) {
boolean isObjectMetadataUploadSupported = isObjectMetadataUploadSupported();

fileSnapshots.forEach(fileSnapshot -> {
BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm());
if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) {
uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority);
} else {
logger.info("uploading file = {}", fileSnapshot.getName());
uploadBlob(fileSnapshot, listener, blobPath, writePriority);
}
});

}
if (!isObjectMetadataUploadSupported) {
List<Exception> exceptionList = new ArrayList<>(2);

private Map<String, String> prepareFileMetadata(TransferFileSnapshot fileSnapshot) throws IOException {
if (!(fileSnapshot instanceof FileSnapshot.TranslogFileSnapshot)) {
return null;
}
Set<TransferFileSnapshot> filesToUpload = new HashSet<>(2);
try {
if (!fileTransferTrackerCache.get(fileSnapshot.getName()).equals(FileTransferTracker.TransferState.SUCCESS)) {
filesToUpload.add(
new TransferFileSnapshot(fileSnapshot.getPath(), fileSnapshot.getPrimaryTerm(), fileSnapshot.getChecksum())
);
}
if (!fileTransferTrackerCache.get(fileSnapshot.getMetadataFileName())
.equals(FileTransferTracker.TransferState.FAILED)) {
filesToUpload.add(
new TransferFileSnapshot(
fileSnapshot.getMetadataFilePath(),
fileSnapshot.getPrimaryTerm(),
fileSnapshot.getMetadataFileChecksum()
)
);
}
} catch (IOException e) {
throw new FileTransferException(fileSnapshot, e);
}

if (filesToUpload.isEmpty()) {
listener.onResponse(fileSnapshot);
return;
}

final CountDownLatch latch = new CountDownLatch(filesToUpload.size());
LatchedActionListener<TransferFileSnapshot> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(fileSnapshot1 -> add(fileSnapshot1.getName(), true), ex -> {
assert ex instanceof FileTransferException;
logger.error(
() -> new ParameterizedMessage(
"Exception during transfer for file {}",
((FileTransferException) ex).getFileSnapshot().getName()
),
ex
);
FileTransferException e = (FileTransferException) ex;
TransferFileSnapshot file = e.getFileSnapshot();
add(file.getName(), false);
exceptionList.add(ex);
}),
latch
);

filesToUpload.forEach(separateFileSnapshot -> {
if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) {
uploadBlob(
ThreadPool.Names.TRANSLOG_TRANSFER,
separateFileSnapshot,
blobPath,
latchedActionListener,
writePriority
);
} else {
logger.info("uploading file = {}", fileSnapshot.getName());
uploadBlob(separateFileSnapshot, latchedActionListener, blobPath, writePriority);
}
});

try {
if (latch.await(300, TimeUnit.SECONDS) == false) {
Exception ex = new TranslogUploadFailedException(
"Timed out waiting for transfer of snapshot " + fileSnapshot + " to complete"
);
throw new FileTransferException(fileSnapshot, ex);
}
} catch (InterruptedException ex) {
Exception exception = new TranslogUploadFailedException("Failed to upload " + fileSnapshot, ex);
Thread.currentThread().interrupt();
throw new FileTransferException(fileSnapshot, exception);
}

FileSnapshot.TranslogFileSnapshot tlogFileSnapshot = (FileSnapshot.TranslogFileSnapshot) fileSnapshot;
String ckpAsString = tlogFileSnapshot.provideCheckpointDataAsString();
Long checkpointChecksum = tlogFileSnapshot.getCheckpointChecksum();
if (fileTransferTrackerCache.get(fileSnapshot.getName()).equals(FileTransferTracker.TransferState.SUCCESS)
&& fileTransferTrackerCache.get(fileSnapshot.getMetadataFileName()).equals(FileTransferTracker.TransferState.SUCCESS)) {
listener.onResponse(fileSnapshot);
} else {
assert exceptionList.isEmpty() == false;
listener.onFailure(exceptionList.get(0));
}

assert checkpointChecksum != null : "checksum can not be null";
} else {
if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) {
uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority);
} else {
logger.info("uploading file = {}", fileSnapshot.getName());
uploadBlob(fileSnapshot, listener, blobPath, writePriority);
}
}
});

Map<String, String> metadata = new HashMap<>();
metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_DATA_KEY, ckpAsString);
metadata.put(FileSnapshot.TranslogFileSnapshot.CHECKPOINT_FILE_CHECKSUM_KEY, checkpointChecksum.toString());
return metadata;
}

private void uploadBlob(
Expand All @@ -138,7 +238,7 @@ private void uploadBlob(
try {
Map<String, String> metadata = null;
if (isObjectMetadataUploadSupported()) {
metadata = prepareFileMetadata(fileSnapshot);
metadata = fileSnapshot.prepareFileMetadata();
}

ChannelFactory channelFactory = FileChannel::open;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
Expand Down Expand Up @@ -110,18 +112,36 @@ public static class TransferFileSnapshot extends FileSnapshot {

private final long primaryTerm;
private Long checksum;

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException {
@Nullable
private Path metadataFilePath;
@Nullable
private Long metadataFileChecksum;
@Nullable
private String metadataFileName;

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Path metadataFilePath, Long metadataFileChecksum)
throws IOException {
super(path);
this.primaryTerm = primaryTerm;
this.checksum = checksum;
Objects.requireNonNull(metadataFilePath);
Objects.requireNonNull(metadataFileChecksum);
this.metadataFilePath = metadataFilePath;
this.metadataFileChecksum = metadataFileChecksum;
this.metadataFileName = metadataFilePath.getFileName().toString();
}

public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException {
super(name, content);
this.primaryTerm = primaryTerm;
}

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException {
super(path);
this.primaryTerm = primaryTerm;
this.checksum = checksum;
}

public Long getChecksum() {
return checksum;
}
Expand All @@ -130,6 +150,56 @@ public long getPrimaryTerm() {
return primaryTerm;
}

public Path getMetadataFilePath() {
return metadataFilePath;
}

public Long getMetadataFileChecksum() {
return metadataFileChecksum;
}

public long getMinTranslogGeneration() {
return getMinTranslogGeneration();
}

public String getMetadataFileName() {
return metadataFileName;
}

public Map<String, String> prepareFileMetadata() throws IOException {
if (!(this instanceof TranslogCheckpointFileSnapshot)) {
return null;
}

String ckpAsString = provideCheckpointDataAsString();
Long checkpointChecksum = metadataFileChecksum;

assert checkpointChecksum != null : "checksum can not be null";

Map<String, String> metadata = new HashMap<>();
metadata.put(TranslogFileSnapshot.CHECKPOINT_FILE_DATA_KEY, ckpAsString);
metadata.put(TranslogFileSnapshot.CHECKPOINT_FILE_CHECKSUM_KEY, checkpointChecksum.toString());
return metadata;
}

public String provideCheckpointDataAsString() throws IOException {
return buildCheckpointDataAsBase64String(metadataFilePath);
}

static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException {
long fileSize = Files.size(checkpointFilePath);
assert fileSize < 1500 : "checkpoint file size is more than 1.5KB size, can't be stored as metadata";
byte[] fileBytes = Files.readAllBytes(checkpointFilePath);
return Base64.getEncoder().encodeToString(fileBytes);
}

public static byte[] convertBase64StringToCheckpointFileDataBytes(String base64CheckpointString) {
if (base64CheckpointString == null) {
return null;
}
return Base64.getDecoder().decode(base64CheckpointString);
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, super.hashCode());
Expand Down Expand Up @@ -259,4 +329,34 @@ public boolean equals(Object o) {
return false;
}
}

/**
* Single snapshot of combined translog.tlog and translog.ckp files that gets transferred
*
* @opensearch.internal
*/
public static final class TranslogCheckpointFileSnapshot extends TransferFileSnapshot {

private final long generation;
private final long minTranslogGeneration;

public TranslogCheckpointFileSnapshot(
Path tlogFilepath,
long primaryTerm,
Long tlogFilechecksum,
long generation,
long minTranslogGeneration,
Path ckpFilePath,
Long ckpFileChecksum
) throws IOException {
super(tlogFilepath, primaryTerm, tlogFilechecksum, ckpFilePath, ckpFileChecksum);
this.minTranslogGeneration = minTranslogGeneration;
this.generation = generation;
}

public long getGeneration() {
return generation;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) {
}

add(fileSnapshot.getName(), TransferState.SUCCESS);
add(fileSnapshot.getMetadataFileName(), TransferState.SUCCESS);
}

void add(String file, boolean success) {
Expand Down Expand Up @@ -130,7 +131,7 @@ public Set<String> allUploaded() {
/**
* Represents the state of the upload operation
*/
private enum TransferState {
enum TransferState {
SUCCESS,
FAILED;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Interface for the translog transfer service responsible for interacting with a remote store
Expand Down Expand Up @@ -147,6 +148,8 @@ void listAllInSortedOrderAsync(
ActionListener<List<BlobMetadata>> listener
);

ConcurrentHashMap<String, FileTransferTracker.TransferState> getFileTransferTrackerCache();

boolean isObjectMetadataUploadSupported();

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.index.translog.transfer.FileSnapshot.TranslogCheckpointFileSnapshot;
import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;

import java.util.Set;
Expand Down Expand Up @@ -39,4 +40,10 @@ public interface TransferSnapshot {
* @return the translog transfer metadata
*/
TranslogTransferMetadata getTranslogTransferMetadata();

/**
* The single snapshot of the translog and checkpoint generational files
* @return the set of {@link TranslogCheckpointFileSnapshot}
*/
Set<TransferFileSnapshot> getTranslogCheckpointFileSnapshots();
}
Loading

0 comments on commit 84f9c92

Please sign in to comment.