diff --git a/s3stream/build.gradle b/s3stream/build.gradle index 8ad1e062f..88027458a 100644 --- a/s3stream/build.gradle +++ b/s3stream/build.gradle @@ -36,8 +36,8 @@ dependencies { } group = 'com.automq.elasticstream' -version = '0.18.0-SNAPSHOT' description = 's3stream' +java.sourceCompatibility = '11' java { withSourcesJar() diff --git a/s3stream/pom.xml b/s3stream/pom.xml index b6518b2e4..d74a54ff4 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -31,8 +31,8 @@ 2.20.127 3.2.0 - 17 - 17 + 11 + 11 UTF-8 1.32.0 1.9.20.1 diff --git a/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java b/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java index 71c669d2e..830561675 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java @@ -18,12 +18,28 @@ package com.automq.stream.s3; import io.netty.buffer.ByteBuf; +import java.util.Objects; -public record DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition, - int size) { +public final class DataBlockIndex { public static final int BLOCK_INDEX_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */ + 4 /* record count */ + 8 /* block position */ + 4 /* block size */; + private final long streamId; + private final long startOffset; + private final int endOffsetDelta; + private final int recordCount; + private final long startPosition; + private final int size; + + public DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition, + int size) { + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffsetDelta = endOffsetDelta; + this.recordCount = recordCount; + this.startPosition = startPosition; + this.size = size; + } public long endOffset() { return startOffset + endOffsetDelta; @@ -41,4 +57,60 @@ public void encode(ByteBuf buf) { buf.writeLong(startPosition); buf.writeInt(size); } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + public int endOffsetDelta() { + return endOffsetDelta; + } + + public int recordCount() { + return recordCount; + } + + public long startPosition() { + return startPosition; + } + + public int size() { + return size; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (DataBlockIndex) obj; + return this.streamId == that.streamId && + this.startOffset == that.startOffset && + this.endOffsetDelta == that.endOffsetDelta && + this.recordCount == that.recordCount && + this.startPosition == that.startPosition && + this.size == that.size; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset, endOffsetDelta, recordCount, startPosition, size); + } + + @Override + public String toString() { + return "DataBlockIndex[" + + "streamId=" + streamId + ", " + + "startOffset=" + startOffset + ", " + + "endOffsetDelta=" + endOffsetDelta + ", " + + "recordCount=" + recordCount + ", " + + "startPosition=" + startPosition + ", " + + "size=" + size + ']'; + } + } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index 5e9001250..fc6c82472 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -28,6 +28,7 @@ import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; @@ -121,10 +122,20 @@ public void close0() { } /** - * @param dataBlockSize The total size of the data blocks, which equals to index start position. - * @param indexBlock raw index data. + * */ - public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) { + public static final class BasicObjectInfo { + private final long dataBlockSize; + private final IndexBlock indexBlock; + + /** + * @param dataBlockSize The total size of the data blocks, which equals to index start position. + * @param indexBlock raw index data. + */ + public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) { + this.dataBlockSize = dataBlockSize; + this.indexBlock = indexBlock; + } public static BasicObjectInfo parse(ByteBuf objectTailBuf, S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException { @@ -154,6 +165,38 @@ public int size() { void close() { indexBlock.close(); } + + public long dataBlockSize() { + return dataBlockSize; + } + + public IndexBlock indexBlock() { + return indexBlock; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (BasicObjectInfo) obj; + return this.dataBlockSize == that.dataBlockSize && + Objects.equals(this.indexBlock, that.indexBlock); + } + + @Override + public int hashCode() { + return Objects.hash(dataBlockSize, indexBlock); + } + + @Override + public String toString() { + return "BasicObjectInfo[" + + "dataBlockSize=" + dataBlockSize + ", " + + "indexBlock=" + indexBlock + ']'; + } + } public static class IndexBlock { @@ -261,8 +304,62 @@ void close() { } } - public record FindIndexResult(boolean isFulfilled, long nextStartOffset, int nextMaxBytes, - List streamDataBlocks) { + public static final class FindIndexResult { + private final boolean isFulfilled; + private final long nextStartOffset; + private final int nextMaxBytes; + private final List streamDataBlocks; + + public FindIndexResult(boolean isFulfilled, long nextStartOffset, int nextMaxBytes, + List streamDataBlocks) { + this.isFulfilled = isFulfilled; + this.nextStartOffset = nextStartOffset; + this.nextMaxBytes = nextMaxBytes; + this.streamDataBlocks = streamDataBlocks; + } + + public boolean isFulfilled() { + return isFulfilled; + } + + public long nextStartOffset() { + return nextStartOffset; + } + + public int nextMaxBytes() { + return nextMaxBytes; + } + + public List streamDataBlocks() { + return streamDataBlocks; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (FindIndexResult) obj; + return this.isFulfilled == that.isFulfilled && + this.nextStartOffset == that.nextStartOffset && + this.nextMaxBytes == that.nextMaxBytes && + Objects.equals(this.streamDataBlocks, that.streamDataBlocks); + } + + @Override + public int hashCode() { + return Objects.hash(isFulfilled, nextStartOffset, nextMaxBytes, streamDataBlocks); + } + + @Override + public String toString() { + return "FindIndexResult[" + + "isFulfilled=" + isFulfilled + ", " + + "nextStartOffset=" + nextStartOffset + ", " + + "nextMaxBytes=" + nextMaxBytes + ", " + + "streamDataBlocks=" + streamDataBlocks + ']'; + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 84bca7ac5..7d7681f1b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -736,7 +736,15 @@ synchronized private long calculate() { * Wrapper of {@link WalWriteRequest}. * When the {@code request} is null, it is used as a flag. */ - record WalWriteRequestWrapper(WalWriteRequest request) { + static final class WalWriteRequestWrapper { + private final WalWriteRequest request; + + /** + * + */ + WalWriteRequestWrapper(WalWriteRequest request) { + this.request = request; + } static WalWriteRequestWrapper flag() { return new WalWriteRequestWrapper(null); @@ -745,6 +753,32 @@ static WalWriteRequestWrapper flag() { public boolean isFlag() { return request == null; } + + public WalWriteRequest request() { + return request; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (WalWriteRequestWrapper) obj; + return Objects.equals(this.request, that.request); + } + + @Override + public int hashCode() { + return Objects.hash(request); + } + + @Override + public String toString() { + return "WalWriteRequestWrapper[" + + "request=" + request + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java index d75ca18e6..49843f8d1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.SortedMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -391,7 +392,45 @@ public interface CacheEvictListener { void onCacheEvict(long streamId, long startOffset, long endOffset, int size); } - record CacheBlockKey(long streamId, long startOffset) { + static final class CacheBlockKey { + private final long streamId; + private final long startOffset; + + CacheBlockKey(long streamId, long startOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (CacheBlockKey) obj; + return this.streamId == that.streamId && + this.startOffset == that.startOffset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset); + } + + @Override + public String toString() { + return "CacheBlockKey[" + + "streamId=" + streamId + ", " + + "startOffset=" + startOffset + ']'; + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java index 3162a2f0a..257ab5e85 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; @@ -89,6 +90,45 @@ public void readDataBlock(ObjectReader reader, DataBlockIndex blockIndex) { } } - public record ReserveResult(int reserveSize, CompletableFuture cf) { + public static final class ReserveResult { + private final int reserveSize; + private final CompletableFuture cf; + + public ReserveResult(int reserveSize, CompletableFuture cf) { + this.reserveSize = reserveSize; + this.cf = cf; + } + + public int reserveSize() { + return reserveSize; + } + + public CompletableFuture cf() { + return cf; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReserveResult) obj; + return this.reserveSize == that.reserveSize && + Objects.equals(this.cf, that.cf); + } + + @Override + public int hashCode() { + return Objects.hash(reserveSize, cf); + } + + @Override + public String toString() { + return "ReserveResult[" + + "reserveSize=" + reserveSize + ", " + + "cf=" + cf + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index d517dd10c..c123eddf8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -243,7 +244,45 @@ public enum ReadBlockCacheStatus { WAIT_THROTTLE, } - public record ReadAheadTaskKey(long streamId, long startOffset) { + public static final class ReadAheadTaskKey { + private final long streamId; + private final long startOffset; + + public ReadAheadTaskKey(long streamId, long startOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReadAheadTaskKey) obj; + return this.streamId == that.streamId && + this.startOffset == that.startOffset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset); + } + + @Override + public String toString() { + return "ReadAheadTaskKey[" + + "streamId=" + streamId + ", " + + "startOffset=" + startOffset + ']'; + } } @@ -261,7 +300,21 @@ void setStatus(ReadBlockCacheStatus status) { } } - public record ReadTaskKey(long streamId, long startOffset, long endOffset, int maxBytes, UUID uuid) { + public static final class ReadTaskKey { + private final long streamId; + private final long startOffset; + private final long endOffset; + private final int maxBytes; + private final UUID uuid; + + public ReadTaskKey(long streamId, long startOffset, long endOffset, int maxBytes, UUID uuid) { + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.maxBytes = maxBytes; + this.uuid = uuid; + } + @Override public String toString() { return "ReadTaskKey{" + @@ -272,6 +325,46 @@ public String toString() { ", uuid=" + uuid + '}'; } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + public long endOffset() { + return endOffset; + } + + public int maxBytes() { + return maxBytes; + } + + public UUID uuid() { + return uuid; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReadTaskKey) obj; + return this.streamId == that.streamId && + this.startOffset == that.startOffset && + this.endOffset == that.endOffset && + this.maxBytes == that.maxBytes && + Objects.equals(this.uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset, endOffset, maxBytes, uuid); + } + } public static class ReadTaskContext { @@ -288,7 +381,38 @@ void setStatus(ReadBlockCacheStatus status) { } } - public record ReadAheadRecord(long nextRAOffset) { + public static final class ReadAheadRecord { + private final long nextRAOffset; + + public ReadAheadRecord(long nextRAOffset) { + this.nextRAOffset = nextRAOffset; + } + + public long nextRAOffset() { + return nextRAOffset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReadAheadRecord) obj; + return this.nextRAOffset == that.nextRAOffset; + } + + @Override + public int hashCode() { + return Objects.hash(nextRAOffset); + } + + @Override + public String toString() { + return "ReadAheadRecord[" + + "nextRAOffset=" + nextRAOffset + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java b/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java index e19a1ea30..5ed00c71f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -147,6 +148,45 @@ public void run() { } } - record InflightReadItem(int readSize, CompletableFuture cf) { + static final class InflightReadItem { + private final int readSize; + private final CompletableFuture cf; + + InflightReadItem(int readSize, CompletableFuture cf) { + this.readSize = readSize; + this.cf = cf; + } + + public int readSize() { + return readSize; + } + + public CompletableFuture cf() { + return cf; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (InflightReadItem) obj; + return this.readSize == that.readSize && + Objects.equals(this.cf, that.cf); + } + + @Override + public int hashCode() { + return Objects.hash(readSize, cf); + } + + @Override + public String toString() { + return "InflightReadItem[" + + "readSize=" + readSize + ", " + + "cf=" + cf + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index ee4913a52..3bb037bec 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -313,7 +313,7 @@ public CompletableFuture forceSplitAll() { * @param streamMetadataList metadata of opened streams * @param objectMetadata stream set object to split * @param cfs List of CompletableFuture of StreamObject - * @return true if split succeed, false otherwise + * @return true if split succeed, false otherwise */ private boolean splitStreamSetObject(List streamMetadataList, S3ObjectMetadata objectMetadata, Collection> cfs) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionStats.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionStats.java index 3c1c4ab5b..51a032045 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionStats.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; public class CompactionStats { @@ -60,7 +61,45 @@ public Map getS3ObjectToCompactedObjectNumMap() { return s3ObjectToCompactedObjectNumMap; } - public record CompactionStreamRecord(int streamNumInStreamSet, int streamObjectNum) { + public static final class CompactionStreamRecord { + private final int streamNumInStreamSet; + private final int streamObjectNum; + + public CompactionStreamRecord(int streamNumInStreamSet, int streamObjectNum) { + this.streamNumInStreamSet = streamNumInStreamSet; + this.streamObjectNum = streamObjectNum; + } + + public int streamNumInStreamSet() { + return streamNumInStreamSet; + } + + public int streamObjectNum() { + return streamObjectNum; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (CompactionStreamRecord) obj; + return this.streamNumInStreamSet == that.streamNumInStreamSet && + this.streamObjectNum == that.streamObjectNum; + } + + @Override + public int hashCode() { + return Objects.hash(streamNumInStreamSet, streamObjectNum); + } + + @Override + public String toString() { + return "CompactionStreamRecord[" + + "streamNumInStreamSet=" + streamNumInStreamSet + ", " + + "streamObjectNum=" + streamObjectNum + ']'; + } } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 5cc780ef2..775a58e57 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -189,7 +189,7 @@ private void readContinuousBlocks0(List streamDataBlocks) { private CompletableFuture rangeRead(long start, long end) { return rangeRead0(start, end).whenComplete((ret, ex) -> - CompactionStats.getInstance().compactionReadSizeStats.add(MetricsLevel.INFO, ret.readableBytes())); + CompactionStats.getInstance().compactionReadSizeStats.add(MetricsLevel.INFO, ret.readableBytes())); } private CompletableFuture rangeRead0(long start, long end) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java index 7b633c84e..341a2926a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java @@ -116,7 +116,7 @@ public static Map> blockWaitObjectIndices(List sortStreamRangePositions(Map> streamDataBlocksMap) { //TODO: use merge sort @@ -135,7 +135,7 @@ public static List sortStreamRangePositions(Map> groupStreamDataBlocks(List streamDataBlocks, Predicate predicate) { @@ -156,7 +156,8 @@ public static List> groupStreamDataBlocks(List buildObjectStreamRangeFromGroup(List> streamDataBlockGroup) { + public static List buildObjectStreamRangeFromGroup( + List> streamDataBlockGroup) { List objectStreamRanges = new ArrayList<>(); for (List streamDataBlocks : streamDataBlockGroup) { @@ -174,7 +175,8 @@ public static List buildObjectStreamRangeFromGroup(List buildDataBlockIndicesFromGroup(List> streamDataBlockGroup) { + public static List buildDataBlockIndicesFromGroup( + List> streamDataBlockGroup) { List dataBlockIndices = new ArrayList<>(); long blockStartPosition = 0; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 11dba5a2a..10d3e1c34 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -268,14 +268,14 @@ public static void registerNetworkLimiterSupplier(AsyncNetworkBandwidthLimiter.T Supplier networkAvailableBandwidthSupplier, Supplier networkLimiterQueueSizeSupplier) { switch (type) { - case INBOUND -> { + case INBOUND: S3StreamMetricsManager.networkInboundAvailableBandwidthSupplier = networkAvailableBandwidthSupplier; S3StreamMetricsManager.networkInboundLimiterQueueSizeSupplier = networkLimiterQueueSizeSupplier; - } - case OUTBOUND -> { + break; + case OUTBOUND: S3StreamMetricsManager.networkOutboundAvailableBandwidthSupplier = networkAvailableBandwidthSupplier; S3StreamMetricsManager.networkOutboundLimiterQueueSizeSupplier = networkLimiterQueueSizeSupplier; - } + break; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java index 529528751..1435c8189 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java @@ -25,7 +25,7 @@ public class HistogramMetric extends ConfigurableMetrics { private final LongHistogram longHistogram; - public HistogramMetric(MetricsConfig metricsConfig,LongHistogram longHistogram) { + public HistogramMetric(MetricsConfig metricsConfig, LongHistogram longHistogram) { this(metricsConfig, Attributes.empty(), longHistogram); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java index 4208f450d..8a2c36d48 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java @@ -167,10 +167,58 @@ public String getName() { } } - record BucketItem(int priority, long size, CompletableFuture cf) implements Comparable { + static final class BucketItem implements Comparable { + private final int priority; + private final long size; + private final CompletableFuture cf; + + BucketItem(int priority, long size, CompletableFuture cf) { + this.priority = priority; + this.size = size; + this.cf = cf; + } + @Override public int compareTo(BucketItem o) { return Long.compare(priority, o.priority); } + + public int priority() { + return priority; + } + + public long size() { + return size; + } + + public CompletableFuture cf() { + return cf; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (BucketItem) obj; + return this.priority == that.priority && + this.size == that.size && + Objects.equals(this.cf, that.cf); + } + + @Override + public int hashCode() { + return Objects.hash(priority, size, cf); + } + + @Override + public String toString() { + return "BucketItem[" + + "priority=" + priority + ", " + + "size=" + size + ", " + + "cf=" + cf + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 3c489fc2d..8de9e8cd5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -47,6 +47,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -176,7 +177,8 @@ private static boolean checkPartNumbers(CompletedMultipartUpload multipartUpload private static boolean isUnrecoverable(Throwable ex) { ex = cause(ex); - if (ex instanceof S3Exception s3Ex) { + if (ex instanceof S3Exception) { + S3Exception s3Ex = (S3Exception) ex; return s3Ex.statusCode() == HttpStatusCode.FORBIDDEN || s3Ex.statusCode() == HttpStatusCode.NOT_FOUND; } return false; @@ -808,7 +810,61 @@ void handleReadCompleted(ByteBuf rst, Throwable ex) { } } - record ReadTask(String path, long start, long end, CompletableFuture cf) { + static final class ReadTask { + private final String path; + private final long start; + private final long end; + private final CompletableFuture cf; + + ReadTask(String path, long start, long end, CompletableFuture cf) { + this.path = path; + this.start = start; + this.end = end; + this.cf = cf; + } + + public String path() { + return path; + } + + public long start() { + return start; + } + + public long end() { + return end; + } + + public CompletableFuture cf() { + return cf; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReadTask) obj; + return Objects.equals(this.path, that.path) && + this.start == that.start && + this.end == that.end && + Objects.equals(this.cf, that.cf); + } + + @Override + public int hashCode() { + return Objects.hash(path, start, end, cf); + } + + @Override + public String toString() { + return "ReadTask[" + + "path=" + path + ", " + + "start=" + start + ", " + + "end=" + end + ", " + + "cf=" + cf + ']'; + } } public static class Builder { diff --git a/s3stream/src/main/java/com/automq/stream/s3/trace/aop/S3StreamTraceAspect.java b/s3stream/src/main/java/com/automq/stream/s3/trace/aop/S3StreamTraceAspect.java index 3490797fb..a4923c5df 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/trace/aop/S3StreamTraceAspect.java +++ b/s3stream/src/main/java/com/automq/stream/s3/trace/aop/S3StreamTraceAspect.java @@ -35,7 +35,8 @@ public void trace(WithSpan withSpan) { @Around(value = "trace(withSpan) && execution(* com.automq.stream..*(..))", argNames = "joinPoint,withSpan") public Object createSpan(ProceedingJoinPoint joinPoint, WithSpan withSpan) throws Throwable { Object[] args = joinPoint.getArgs(); - if (args.length > 0 && args[0] instanceof TraceContext context) { + if (args.length > 0 && args[0] instanceof TraceContext) { + TraceContext context = (TraceContext) args[0]; return TraceUtils.trace(context, joinPoint, withSpan); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 95b00dc23..849a72300 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -713,15 +714,56 @@ public String toString() { } } - record AppendResultImpl(long recordOffset, CompletableFuture future) implements AppendResult { + static final class AppendResultImpl implements AppendResult { + private final long recordOffset; + private final CompletableFuture future; + + AppendResultImpl(long recordOffset, CompletableFuture future) { + this.recordOffset = recordOffset; + this.future = future; + } @Override public String toString() { return "AppendResultImpl{" + "recordOffset=" + recordOffset + '}'; } + + @Override + public long recordOffset() { + return recordOffset; + } + + @Override + public CompletableFuture future() { + return future; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (AppendResultImpl) obj; + return this.recordOffset == that.recordOffset && + Objects.equals(this.future, that.future); + } + + @Override + public int hashCode() { + return Objects.hash(recordOffset, future); + } + } - record RecoverResultImpl(ByteBuf record, long recordOffset) implements RecoverResult { + static final class RecoverResultImpl implements RecoverResult { + private final ByteBuf record; + private final long recordOffset; + + RecoverResultImpl(ByteBuf record, long recordOffset) { + this.record = record; + this.recordOffset = recordOffset; + } @Override public String toString() { @@ -730,6 +772,33 @@ public String toString() { + ", recordOffset=" + recordOffset + '}'; } + + @Override + public ByteBuf record() { + return record; + } + + @Override + public long recordOffset() { + return recordOffset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (RecoverResultImpl) obj; + return Objects.equals(this.record, that.record) && + this.recordOffset == that.recordOffset; + } + + @Override + public int hashCode() { + return Objects.hash(record, recordOffset); + } + } static class ReadRecordException extends Exception { diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java index f50042617..da60081d7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.NavigableSet; +import java.util.Objects; import java.util.Random; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; @@ -325,7 +326,62 @@ public Result reset() { return new Result(countValue, costNanosValue, maxCostNanosValue, elapsedTimeNanos); } - public record Result(long count, long costNanos, long maxCostNanos, long elapsedTimeNanos) { + public static final class Result { + private final long count; + private final long costNanos; + private final long maxCostNanos; + private final long elapsedTimeNanos; + + public Result(long count, long costNanos, long maxCostNanos, long elapsedTimeNanos) { + this.count = count; + this.costNanos = costNanos; + this.maxCostNanos = maxCostNanos; + this.elapsedTimeNanos = elapsedTimeNanos; + } + + public long count() { + return count; + } + + public long costNanos() { + return costNanos; + } + + public long maxCostNanos() { + return maxCostNanos; + } + + public long elapsedTimeNanos() { + return elapsedTimeNanos; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (Result) obj; + return this.count == that.count && + this.costNanos == that.costNanos && + this.maxCostNanos == that.maxCostNanos && + this.elapsedTimeNanos == that.elapsedTimeNanos; + } + + @Override + public int hashCode() { + return Objects.hash(count, costNanos, maxCostNanos, elapsedTimeNanos); + } + + @Override + public String toString() { + return "Result[" + + "count=" + count + ", " + + "costNanos=" + costNanos + ", " + + "maxCostNanos=" + maxCostNanos + ", " + + "elapsedTimeNanos=" + elapsedTimeNanos + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java b/s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java index af0cff6e8..9602311ea 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java @@ -18,6 +18,7 @@ package com.automq.stream.utils; import com.google.common.util.concurrent.RateLimiter; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -69,7 +70,44 @@ private synchronized void tick() { } } - record Acquire(CompletableFuture cf, int size) { - } + static final class Acquire { + private final CompletableFuture cf; + private final int size; + + Acquire(CompletableFuture cf, int size) { + this.cf = cf; + this.size = size; + } + + public CompletableFuture cf() { + return cf; + } + + public int size() { + return size; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (Acquire) obj; + return Objects.equals(this.cf, that.cf) && + this.size == that.size; + } + @Override + public int hashCode() { + return Objects.hash(cf, size); + } + + @Override + public String toString() { + return "Acquire[" + + "cf=" + cf + ", " + + "size=" + size + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java index d835ea18b..84590eb57 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java +++ b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java @@ -109,7 +109,8 @@ public S3CheckTask(S3Context context, String taskName) { } protected static void showErrorInfo(Exception e) { - if (e.getCause() instanceof S3Exception se) { + if (e.getCause() instanceof S3Exception) { + S3Exception se = (S3Exception) e.getCause(); // Do not use system.err because automq admin tool suppress system.err System.out.println("get S3 exception: "); se.printStackTrace(System.out); diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java index 5b8abc052..172210971 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java @@ -19,6 +19,7 @@ import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.ObjectReader; +import java.util.Objects; public class IndexBlockOrderedBytes extends AbstractOrderedCollection { private final ObjectReader.IndexBlock indexBlock; @@ -37,12 +38,55 @@ protected ComparableItem get(int index) { return new ComparableStreamRange(indexBlock.get(index)); } - public record TargetStreamOffset(long streamId, long offset) { + public static final class TargetStreamOffset { + private final long streamId; + private final long offset; + + public TargetStreamOffset(long streamId, long offset) { + this.streamId = streamId; + this.offset = offset; + } + + public long streamId() { + return streamId; + } + + public long offset() { + return offset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (TargetStreamOffset) obj; + return this.streamId == that.streamId && + this.offset == that.offset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, offset); + } + + @Override + public String toString() { + return "TargetStreamOffset[" + + "streamId=" + streamId + ", " + + "offset=" + offset + ']'; + } } - private record ComparableStreamRange(DataBlockIndex index) + private static final class ComparableStreamRange implements ComparableItem { + private final DataBlockIndex index; + + private ComparableStreamRange(DataBlockIndex index) { + this.index = index; + } public long endOffset() { return index.endOffset(); @@ -69,5 +113,31 @@ public boolean isGreaterThan(TargetStreamOffset value) { return this.index().startOffset() > value.offset; } } + + public DataBlockIndex index() { + return index; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ComparableStreamRange) obj; + return Objects.equals(this.index, that.index); + } + + @Override + public int hashCode() { + return Objects.hash(index); + } + + @Override + public String toString() { + return "ComparableStreamRange[" + + "index=" + index + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java index 934d5a340..8377d9890 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java @@ -20,6 +20,7 @@ import com.automq.stream.s3.model.StreamRecordBatch; import java.util.ArrayList; import java.util.List; +import java.util.Objects; public class StreamRecordBatchList extends AbstractOrderedCollection { @@ -44,7 +45,13 @@ protected ComparableItem get(int index) { return records.get(index); } - private record ComparableStreamRecordBatch(StreamRecordBatch recordBatch) implements ComparableItem { + private static final class ComparableStreamRecordBatch implements ComparableItem { + private final StreamRecordBatch recordBatch; + + private ComparableStreamRecordBatch(StreamRecordBatch recordBatch) { + this.recordBatch = recordBatch; + } + @Override public boolean isLessThan(Long value) { return recordBatch.getLastOffset() <= value; @@ -54,5 +61,31 @@ public boolean isLessThan(Long value) { public boolean isGreaterThan(Long value) { return recordBatch.getBaseOffset() > value; } + + public StreamRecordBatch recordBatch() { + return recordBatch; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ComparableStreamRecordBatch) obj; + return Objects.equals(this.recordBatch, that.recordBatch); + } + + @Override + public int hashCode() { + return Objects.hash(recordBatch); + } + + @Override + public String toString() { + return "ComparableStreamRecordBatch[" + + "recordBatch=" + recordBatch + ']'; + } + } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java index 64685a0f0..8668caf88 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java @@ -185,10 +185,17 @@ void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, Il for (int i = 0; i < 3; i++) { int partNum = uploadPartRequests.get(i).partNumber(); switch (partNum) { - case 2 -> assertEquals(120L, writeContentLengths.get(i)); - case 3 -> assertEquals(280L, writeContentLengths.get(i)); - case 4 -> assertEquals(10L, writeContentLengths.get(i)); - default -> throw new IllegalStateException(); + case 2: + assertEquals(120L, writeContentLengths.get(i)); + break; + case 3: + assertEquals(280L, writeContentLengths.get(i)); + break; + case 4: + assertEquals(10L, writeContentLengths.get(i)); + break; + default: + throw new IllegalStateException(); } }