Skip to content

Commit

Permalink
track translog files upload for a generation
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed Apr 25, 2024
1 parent 34095a3 commit df169f0
Show file tree
Hide file tree
Showing 14 changed files with 365 additions and 358 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ static long urlBase64ToLong(String base64Str) {
return ByteBuffer.wrap(hashBytes).getLong();
}

public static byte[] convertBase64StringToBytesArray(String base64String) {
public static byte[] getBytesArrayFromBase64String(String base64String) {
if (base64String == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public final class Checkpoint {
* @param trimmedAboveSeqNo all operations with seq# above trimmedAboveSeqNo should be ignored and not read from the
* corresponding translog file. {@link SequenceNumbers#UNASSIGNED_SEQ_NO} is used to disable trimming.
*/
public Checkpoint(
Checkpoint(
long offset,
int numOps,
long generation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
}

// Visible for testing
public Set<String> allUploaded() {
public Set<Long> allUploaded() {
return fileTransferTracker.allUploaded();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.translog.Checkpoint;

import java.io.BufferedInputStream;
import java.io.Closeable;
Expand Down Expand Up @@ -64,10 +63,6 @@ public String getName() {
return name;
}

public byte[] getContent() {
return content;
}

public long getContentLength() throws IOException {
return fileChannel == null ? content.length : fileChannel.size();
}
Expand Down Expand Up @@ -116,32 +111,48 @@ public static class TransferFileSnapshot extends FileSnapshot {
private final long primaryTerm;
private Long checksum;
@Nullable
private String checkpointFileName;
private long generation;
@Nullable
private Map<String, String> metadata;
private long minTranslogGeneration;
@Nullable
private Path ckpFilePath;
@Nullable
private Long ckpFileChecksum;
@Nullable
private Checkpoint checkpoint;
private Map<String, String> metadata;

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException {
public TransferFileSnapshot(Path path, long primaryTerm, long generation, Long checksum) throws IOException {
super(path);
this.primaryTerm = primaryTerm;
this.checksum = checksum;
this.generation = generation;
}

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Checkpoint checkpoint) throws IOException {
public TransferFileSnapshot(
long primaryTerm,
long generation,
long minTranslogGeneration,
Path path,
Long checksum,
Path ckpFilePath,
Long ckpFileChecksum
) throws IOException {
super(path);
this.primaryTerm = primaryTerm;
this.checksum = checksum;
this.checkpoint = checkpoint;
this.generation = generation;
this.minTranslogGeneration = minTranslogGeneration;
this.ckpFilePath = ckpFilePath;
this.ckpFileChecksum = ckpFileChecksum;
}

public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException {
super(name, content);
this.primaryTerm = primaryTerm;
}

public void setCheckpointFileName(String name) {
this.checkpointFileName = name;
public CheckpointFileSnapshot provideCheckpointFileSnapshot() throws IOException {
return new CheckpointFileSnapshot(primaryTerm, generation, minTranslogGeneration, ckpFilePath, ckpFileChecksum);
}

public Long getChecksum() {
Expand All @@ -152,20 +163,39 @@ public long getPrimaryTerm() {
return primaryTerm;
}

public void setTransferFileSnapshotMetadata(Map<String, String> metadata) {
this.metadata = metadata;
public Map<String, String> getTransferFileSnapshotMetadata() {
return metadata;
}

public String getCheckpointFileName() {
return checkpointFileName;
public long getGeneration() {
return generation;
}

public Map<String, String> getTransferFileSnapshotMetadata() {
return metadata;
public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

public Checkpoint getCheckpoint() {
return checkpoint;
public Path getCkpFilePath() {
return ckpFilePath;
}

public Long getCkpFileChecksum() {
return ckpFileChecksum;
}

public long getCkpFileContentLength() throws IOException {
FileChannel fileChannel = FileChannel.open(ckpFilePath, StandardOpenOption.READ);
long ckpFileSize = fileChannel.size();
fileChannel.close();
return ckpFileSize;
}

public String getCkpFileName() {
return ckpFilePath.getFileName().toString();
}

public void setTransferFileSnapshotMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}

@Override
Expand All @@ -186,62 +216,76 @@ public boolean equals(Object o) {
}

/**
* Snapshot of a single .tlg file that gets transferred
* Snapshot of a single .ckp file that gets transferred
*
* @opensearch.internal
*/
public static final class TranslogFileSnapshot extends TransferFileSnapshot {
public static final class CheckpointFileSnapshot extends TransferFileSnapshot {

private final long generation;

public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException {
super(path, primaryTerm, checksum);
private final long minTranslogGeneration;

public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path, Long checksum)
throws IOException {
super(path, primaryTerm, generation, checksum);
this.minTranslogGeneration = minTranslogGeneration;
this.generation = generation;
}

public long getGeneration() {
return generation;
}

public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

@Override
public int hashCode() {
return Objects.hash(generation, super.hashCode());
return Objects.hash(generation, minTranslogGeneration, super.hashCode());
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
TranslogFileSnapshot other = (TranslogFileSnapshot) o;
return Objects.equals(this.generation, other.generation);
CheckpointFileSnapshot other = (CheckpointFileSnapshot) o;
return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration)
&& Objects.equals(this.generation, other.generation);
}
return false;
}
}

/**
* Snapshot of a single .ckp file that gets transferred
* Single snapshot of combined translog.tlog and translog.ckp files that gets transferred
*
* @opensearch.internal
*/
public static final class CheckpointFileSnapshot extends TransferFileSnapshot {
public static final class TranslogAndCheckpointFileSnapshot extends TransferFileSnapshot {

private final long primaryTerm;
private final long generation;

private final long minTranslogGeneration;
private final long ckpGeneration;

public CheckpointFileSnapshot(
public TranslogAndCheckpointFileSnapshot(
long primaryTerm,
long generation,
long minTranslogGeneration,
Path path,
Long checksum,
Checkpoint checkpoint
Path tlogFilePath,
Long tlogFilechecksum,
Path ckpFilePath,
Long ckpFileChecksum,
Long ckpGeneration
) throws IOException {
super(path, primaryTerm, checksum, checkpoint);
this.minTranslogGeneration = minTranslogGeneration;
super(primaryTerm, generation, minTranslogGeneration, tlogFilePath, tlogFilechecksum, ckpFilePath, ckpFileChecksum);
this.primaryTerm = primaryTerm;
this.generation = generation;
this.minTranslogGeneration = minTranslogGeneration;
this.ckpGeneration = ckpGeneration;
}

public long getGeneration() {
Expand All @@ -252,21 +296,26 @@ public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

public long getCkpGeneration() {
return ckpGeneration;
}

@Override
public int hashCode() {
return Objects.hash(generation, minTranslogGeneration, super.hashCode());
return Objects.hash(primaryTerm, generation, super.hashCode());
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
CheckpointFileSnapshot other = (CheckpointFileSnapshot) o;
return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration)
&& Objects.equals(this.generation, other.generation);
TranslogAndCheckpointFileSnapshot other = (TranslogAndCheckpointFileSnapshot) o;
return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation);
}
return false;
}

}

}
Loading

0 comments on commit df169f0

Please sign in to comment.