diff --git a/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java b/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java index 20c10addd..c2ed3bb39 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java +++ b/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java @@ -18,9 +18,10 @@ package com.automq.stream.s3.model; import com.automq.stream.s3.StreamRecordBatchCodec; +import com.automq.stream.utils.biniarysearch.ComparableItem; import io.netty.buffer.ByteBuf; -public class StreamRecordBatch implements Comparable { +public class StreamRecordBatch implements Comparable, ComparableItem { private final long streamId; private final long epoch; private final long baseOffset; @@ -113,4 +114,14 @@ public String toString() { ", count=" + count + ", size=" + size() + '}'; } + + @Override + public boolean isLessThan(Long value) { + return getLastOffset() <= value; + } + + @Override + public boolean isGreaterThan(Long value) { + return getBaseOffset() > value; + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java index 8377d9890..ef5ffd671 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java @@ -18,20 +18,16 @@ package com.automq.stream.utils.biniarysearch; import com.automq.stream.s3.model.StreamRecordBatch; -import java.util.ArrayList; + import java.util.List; -import java.util.Objects; public class StreamRecordBatchList extends AbstractOrderedCollection { - private final List records; + private final List records; private final int size; public StreamRecordBatchList(List records) { - this.records = new ArrayList<>(records.size()); - for (StreamRecordBatch record : records) { - this.records.add(new ComparableStreamRecordBatch(record)); - } + this.records = records; this.size = records.size(); } @@ -45,47 +41,4 @@ protected ComparableItem get(int index) { return records.get(index); } - private static final class ComparableStreamRecordBatch implements ComparableItem { - private final StreamRecordBatch recordBatch; - - private ComparableStreamRecordBatch(StreamRecordBatch recordBatch) { - this.recordBatch = recordBatch; - } - - @Override - public boolean isLessThan(Long value) { - return recordBatch.getLastOffset() <= value; - } - - @Override - public boolean isGreaterThan(Long value) { - return recordBatch.getBaseOffset() > value; - } - - public StreamRecordBatch recordBatch() { - return recordBatch; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (obj == null || obj.getClass() != this.getClass()) - return false; - var that = (ComparableStreamRecordBatch) obj; - return Objects.equals(this.recordBatch, that.recordBatch); - } - - @Override - public int hashCode() { - return Objects.hash(recordBatch); - } - - @Override - public String toString() { - return "ComparableStreamRecordBatch[" + - "recordBatch=" + recordBatch + ']'; - } - - } }