Skip to content

Commit

Permalink
refactor: downgrade the source level of s3stream to java11 (#901)
Browse files Browse the repository at this point in the history
Signed-off-by: daniel-y <[email protected]>
  • Loading branch information
daniel-y authored Jan 23, 2024
1 parent 4277d95 commit ffdc0de
Show file tree
Hide file tree
Showing 25 changed files with 912 additions and 46 deletions.
2 changes: 1 addition & 1 deletion s3stream/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ dependencies {
}

group = 'com.automq.elasticstream'
version = '0.18.0-SNAPSHOT'
description = 's3stream'
java.sourceCompatibility = '11'

java {
withSourcesJar()
Expand Down
4 changes: 2 additions & 2 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
<s3.version>2.20.127</s3.version>

<maven-checkstyle-plugin.version>3.2.0</maven-checkstyle-plugin.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<opentelemetry.version>1.32.0</opentelemetry.version>
<aspectj.version>1.9.20.1</aspectj.version>
Expand Down
76 changes: 74 additions & 2 deletions s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,28 @@
package com.automq.stream.s3;

import io.netty.buffer.ByteBuf;
import java.util.Objects;

public record DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition,
int size) {
public final class DataBlockIndex {

public static final int BLOCK_INDEX_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */
+ 4 /* record count */ + 8 /* block position */ + 4 /* block size */;
private final long streamId;
private final long startOffset;
private final int endOffsetDelta;
private final int recordCount;
private final long startPosition;
private final int size;

public DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition,
int size) {
this.streamId = streamId;
this.startOffset = startOffset;
this.endOffsetDelta = endOffsetDelta;
this.recordCount = recordCount;
this.startPosition = startPosition;
this.size = size;
}

public long endOffset() {
return startOffset + endOffsetDelta;
Expand All @@ -41,4 +57,60 @@ public void encode(ByteBuf buf) {
buf.writeLong(startPosition);
buf.writeInt(size);
}

public long streamId() {
return streamId;
}

public long startOffset() {
return startOffset;
}

public int endOffsetDelta() {
return endOffsetDelta;
}

public int recordCount() {
return recordCount;
}

public long startPosition() {
return startPosition;
}

public int size() {
return size;
}

@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (DataBlockIndex) obj;
return this.streamId == that.streamId &&
this.startOffset == that.startOffset &&
this.endOffsetDelta == that.endOffsetDelta &&
this.recordCount == that.recordCount &&
this.startPosition == that.startPosition &&
this.size == that.size;
}

@Override
public int hashCode() {
return Objects.hash(streamId, startOffset, endOffsetDelta, recordCount, startPosition, size);
}

@Override
public String toString() {
return "DataBlockIndex[" +
"streamId=" + streamId + ", " +
"startOffset=" + startOffset + ", " +
"endOffsetDelta=" + endOffsetDelta + ", " +
"recordCount=" + recordCount + ", " +
"startPosition=" + startPosition + ", " +
"size=" + size + ']';
}

}
107 changes: 102 additions & 5 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
Expand Down Expand Up @@ -121,10 +122,20 @@ public void close0() {
}

/**
* @param dataBlockSize The total size of the data blocks, which equals to index start position.
* @param indexBlock raw index data.
*
*/
public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) {
public static final class BasicObjectInfo {
private final long dataBlockSize;
private final IndexBlock indexBlock;

/**
* @param dataBlockSize The total size of the data blocks, which equals to index start position.
* @param indexBlock raw index data.
*/
public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) {
this.dataBlockSize = dataBlockSize;
this.indexBlock = indexBlock;
}

public static BasicObjectInfo parse(ByteBuf objectTailBuf,
S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
Expand Down Expand Up @@ -154,6 +165,38 @@ public int size() {
void close() {
indexBlock.close();
}

public long dataBlockSize() {
return dataBlockSize;
}

public IndexBlock indexBlock() {
return indexBlock;
}

@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (BasicObjectInfo) obj;
return this.dataBlockSize == that.dataBlockSize &&
Objects.equals(this.indexBlock, that.indexBlock);
}

@Override
public int hashCode() {
return Objects.hash(dataBlockSize, indexBlock);
}

@Override
public String toString() {
return "BasicObjectInfo[" +
"dataBlockSize=" + dataBlockSize + ", " +
"indexBlock=" + indexBlock + ']';
}

}

public static class IndexBlock {
Expand Down Expand Up @@ -261,8 +304,62 @@ void close() {
}
}

public record FindIndexResult(boolean isFulfilled, long nextStartOffset, int nextMaxBytes,
List<StreamDataBlock> streamDataBlocks) {
public static final class FindIndexResult {
private final boolean isFulfilled;
private final long nextStartOffset;
private final int nextMaxBytes;
private final List<StreamDataBlock> streamDataBlocks;

public FindIndexResult(boolean isFulfilled, long nextStartOffset, int nextMaxBytes,
List<StreamDataBlock> streamDataBlocks) {
this.isFulfilled = isFulfilled;
this.nextStartOffset = nextStartOffset;
this.nextMaxBytes = nextMaxBytes;
this.streamDataBlocks = streamDataBlocks;
}

public boolean isFulfilled() {
return isFulfilled;
}

public long nextStartOffset() {
return nextStartOffset;
}

public int nextMaxBytes() {
return nextMaxBytes;
}

public List<StreamDataBlock> streamDataBlocks() {
return streamDataBlocks;
}

@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (FindIndexResult) obj;
return this.isFulfilled == that.isFulfilled &&
this.nextStartOffset == that.nextStartOffset &&
this.nextMaxBytes == that.nextMaxBytes &&
Objects.equals(this.streamDataBlocks, that.streamDataBlocks);
}

@Override
public int hashCode() {
return Objects.hash(isFulfilled, nextStartOffset, nextMaxBytes, streamDataBlocks);
}

@Override
public String toString() {
return "FindIndexResult[" +
"isFulfilled=" + isFulfilled + ", " +
"nextStartOffset=" + nextStartOffset + ", " +
"nextMaxBytes=" + nextMaxBytes + ", " +
"streamDataBlocks=" + streamDataBlocks + ']';
}

}

Expand Down
36 changes: 35 additions & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,15 @@ synchronized private long calculate() {
* Wrapper of {@link WalWriteRequest}.
* When the {@code request} is null, it is used as a flag.
*/
record WalWriteRequestWrapper(WalWriteRequest request) {
static final class WalWriteRequestWrapper {
private final WalWriteRequest request;

/**
*
*/
WalWriteRequestWrapper(WalWriteRequest request) {
this.request = request;
}

static WalWriteRequestWrapper flag() {
return new WalWriteRequestWrapper(null);
Expand All @@ -745,6 +753,32 @@ static WalWriteRequestWrapper flag() {
public boolean isFlag() {
return request == null;
}

public WalWriteRequest request() {
return request;
}

@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (WalWriteRequestWrapper) obj;
return Objects.equals(this.request, that.request);
}

@Override
public int hashCode() {
return Objects.hash(request);
}

@Override
public String toString() {
return "WalWriteRequestWrapper[" +
"request=" + request + ']';
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -391,7 +392,45 @@ public interface CacheEvictListener {
void onCacheEvict(long streamId, long startOffset, long endOffset, int size);
}

record CacheBlockKey(long streamId, long startOffset) {
static final class CacheBlockKey {
private final long streamId;
private final long startOffset;

CacheBlockKey(long streamId, long startOffset) {
this.streamId = streamId;
this.startOffset = startOffset;
}

public long streamId() {
return streamId;
}

public long startOffset() {
return startOffset;
}

@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (CacheBlockKey) obj;
return this.streamId == that.streamId &&
this.startOffset == that.startOffset;
}

@Override
public int hashCode() {
return Objects.hash(streamId, startOffset);
}

@Override
public String toString() {
return "CacheBlockKey[" +
"streamId=" + streamId + ", " +
"startOffset=" + startOffset + ']';
}

}

Expand Down
Loading

0 comments on commit ffdc0de

Please sign in to comment.