Skip to content

Commit

Permalink
[improve][broker] Use RoaringBitmap in tracking individual acks to re…
Browse files Browse the repository at this point in the history
…duce memory usage (apache#23006)
  • Loading branch information
lhotari authored Jul 6, 2024
1 parent 41ef3f6 commit ed39c4d
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 16 deletions.
2 changes: 1 addition & 1 deletion distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ The Apache Software License, Version 2.0
* RxJava
- io.reactivex.rxjava3-rxjava-3.0.1.jar
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-1.0.6.jar
- org.roaringbitmap-RoaringBitmap-1.2.0.jar
* OpenTelemetry
- io.opentelemetry-opentelemetry-api-1.38.0.jar
- io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar
Expand Down
2 changes: 2 additions & 0 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ The Apache Software License, Version 2.0
- simpleclient_tracer_common-0.16.0.jar
- simpleclient_tracer_otel-0.16.0.jar
- simpleclient_tracer_otel_agent-0.16.0.jar
* RoaringBitmap
- RoaringBitmap-1.2.0.jar
* Log4J
- log4j-api-2.23.1.jar
- log4j-core-2.23.1.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
this.config = managedCursor.getManagedLedger().getConfig();
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
? new OpenLongPairRangeSet<>(4096, rangeConverter)
? new OpenLongPairRangeSet<>(rangeConverter)
: new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ flexible messaging model and an intuitive client API.</description>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>10.0.1</dependency-check-maven.version>
<roaringbitmap.version>1.0.6</roaringbitmap.version>
<roaringbitmap.version>1.2.0</roaringbitmap.version>
<extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version>
<oshi.version>6.4.0</oshi.version>
<checkerframework.version>3.33.0</checkerframework.version>
Expand Down
5 changes: 5 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.lang.mutable.MutableInt;
import org.roaringbitmap.RoaringBitSet;

/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of
Expand All @@ -46,8 +47,6 @@
public class OpenLongPairRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {

protected final NavigableMap<Long, BitSet> rangeBitSetMap = new ConcurrentSkipListMap<>();
private boolean threadSafe = true;
private final int bitSetSize;
private final LongPairConsumer<T> consumer;

// caching place-holder for cpu-optimization to avoid calculating ranges again
Expand All @@ -57,16 +56,6 @@ public class OpenLongPairRangeSet<T extends Comparable<T>> implements LongPairRa
private volatile boolean updatedAfterCachedForToString = true;

public OpenLongPairRangeSet(LongPairConsumer<T> consumer) {
this(1024, true, consumer);
}

public OpenLongPairRangeSet(int size, LongPairConsumer<T> consumer) {
this(size, true, consumer);
}

public OpenLongPairRangeSet(int size, boolean threadSafe, LongPairConsumer<T> consumer) {
this.threadSafe = threadSafe;
this.bitSetSize = size;
this.consumer = consumer;
}

Expand Down Expand Up @@ -416,7 +405,7 @@ private int getSafeEntry(long value) {
}

private BitSet createNewBitSet() {
return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new BitSet(bitSetSize);
return new RoaringBitSet();
}

}

0 comments on commit ed39c4d

Please sign in to comment.