diff --git a/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java b/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java index 20c10addd..90e52a40d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java +++ b/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java @@ -18,9 +18,10 @@ package com.automq.stream.s3.model; import com.automq.stream.s3.StreamRecordBatchCodec; +import com.automq.stream.utils.biniarysearch.primitive.LongComparableItem; import io.netty.buffer.ByteBuf; -public class StreamRecordBatch implements Comparable { +public class StreamRecordBatch implements Comparable, LongComparableItem { private final long streamId; private final long epoch; private final long baseOffset; @@ -113,4 +114,14 @@ public String toString() { ", count=" + count + ", size=" + size() + '}'; } + + @Override + public boolean isLessThan(long value) { + return getLastOffset() <= value; + } + + @Override + public boolean isGreaterThan(long value) { + return getBaseOffset() > value; + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java index 934d5a340..539886ae9 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/StreamRecordBatchList.java @@ -18,19 +18,18 @@ package com.automq.stream.utils.biniarysearch; import com.automq.stream.s3.model.StreamRecordBatch; -import java.util.ArrayList; +import com.automq.stream.utils.biniarysearch.primitive.LongComparableItem; +import com.automq.stream.utils.biniarysearch.primitive.LongOrderedCollection; + import java.util.List; -public class StreamRecordBatchList extends AbstractOrderedCollection { +public class StreamRecordBatchList extends LongOrderedCollection { - private final List records; + private final List records; private final int size; public StreamRecordBatchList(List records) { - this.records = new ArrayList<>(records.size()); - for (StreamRecordBatch record : records) { - this.records.add(new ComparableStreamRecordBatch(record)); - } + this.records = records; this.size = records.size(); } @@ -40,19 +39,8 @@ public int size() { } @Override - protected ComparableItem get(int index) { + protected LongComparableItem get(int index) { return records.get(index); } - private record ComparableStreamRecordBatch(StreamRecordBatch recordBatch) implements ComparableItem { - @Override - public boolean isLessThan(Long value) { - return recordBatch.getLastOffset() <= value; - } - - @Override - public boolean isGreaterThan(Long value) { - return recordBatch.getBaseOffset() > value; - } - } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/primitive/LongComparableItem.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/primitive/LongComparableItem.java new file mode 100644 index 000000000..cd91ec77e --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/primitive/LongComparableItem.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.utils.biniarysearch.primitive; + +public interface LongComparableItem { + boolean isLessThan(long value); + + boolean isGreaterThan(long value); +} diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/primitive/LongOrderedCollection.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/primitive/LongOrderedCollection.java new file mode 100644 index 000000000..457dcc955 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/primitive/LongOrderedCollection.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.utils.biniarysearch.primitive; + +public abstract class LongOrderedCollection { + + protected abstract int size(); + + protected abstract LongComparableItem get(int index); + + public int search(long target) { + int low = 0; + int high = size() - 1; + while (low <= high) { + int mid = low + ((high - low) >>> 1); + LongComparableItem midVal = get(mid); + if (midVal.isLessThan(target)) { + low = mid + 1; + } else if (midVal.isGreaterThan(target)) { + high = mid - 1; + } else { + low = mid; + break; + } + } + if (low > high) { + return -1; + } + return low; + } +}