Skip to content

Commit

Permalink
[fix] Memory leak of the map in ThreadLocalAccessor (streamnative#2006)
Browse files Browse the repository at this point in the history
Fix: streamnative#1956

### Modifications
Update stats on the netty threads to fix the stats memory leak issue.

The root cause is we can only use `DataSketchesOpStatsLogger` in the
netty thread. When used in other threads, it will cause memory leak,
since the `FastThreadLocal` can't remove the map if the thread is not
netty thread.
  • Loading branch information
BewareMyPower authored Sep 20, 2023
1 parent 9ab8411 commit 014340d
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.EventExecutor;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.utils.GroupIdUtils;
Expand Down Expand Up @@ -45,6 +46,7 @@ protected MessageFetchContext newObject(Handle<MessageFetchContext> handle) {
private volatile KafkaTopicManager topicManager;
private volatile RequestStats statsLogger;
private volatile TransactionCoordinator tc;
private volatile EventExecutor eventExecutor;
private volatile String clientHost;
private volatile String namespacePrefix;
private volatile int maxReadEntriesNum;
Expand All @@ -64,6 +66,7 @@ public static MessageFetchContext get(KafkaRequestHandler requestHandler,
KafkaHeaderAndRequest kafkaHeaderAndRequest) {
MessageFetchContext context = RECYCLER.get();
context.requestHandler = requestHandler;
context.eventExecutor = requestHandler.ctx.executor();
context.sharedState = sharedState;
context.decodeExecutor = decodeExecutor;
context.topicManager = requestHandler.getTopicManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,20 @@
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.NonNull;
import org.apache.bookkeeper.common.util.MathUtils;

/**
* Pending futures of PersistentTopic.
* It's used when multiple produce requests encountered while the partition's PersistentTopic was not available.
*/
public class PendingTopicFutures {

private final RequestStats requestStats;
private final long enqueueTimestamp;
private int count = 0;
private CompletableFuture<TopicThrowablePair> currentTopicFuture;

public PendingTopicFutures(RequestStats requestStats) {
this.requestStats = requestStats;
this.enqueueTimestamp = MathUtils.nowInNano();
}

private void registerQueueLatency(boolean success) {
if (requestStats != null) {
if (success) {
requestStats.getMessageQueuedLatencyStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(enqueueTimestamp), TimeUnit.NANOSECONDS);
} else {
requestStats.getMessageQueuedLatencyStats().registerFailedEvent(
MathUtils.elapsedNanos(enqueueTimestamp), TimeUnit.NANOSECONDS);
}
}
}
public PendingTopicFutures() {}

private synchronized void decrementCount() {
count--;
Expand All @@ -62,12 +43,10 @@ public synchronized void addListener(CompletableFuture<PartitionLog> topicFuture
count = 1;
// The first pending future comes
currentTopicFuture = topicFuture.thenApply(persistentTopic -> {
registerQueueLatency(true);
persistentTopicConsumer.accept(persistentTopic);
decrementCount();
return TopicThrowablePair.withTopic(persistentTopic);
}).exceptionally(e -> {
registerQueueLatency(false);
exceptionConsumer.accept(e.getCause());
decrementCount();
return TopicThrowablePair.withThrowable(e.getCause());
Expand All @@ -77,16 +56,13 @@ public synchronized void addListener(CompletableFuture<PartitionLog> topicFuture
// The next pending future reuses the completed result of the previous topic future
currentTopicFuture = currentTopicFuture.thenApply(topicThrowablePair -> {
if (topicThrowablePair.getThrowable() == null) {
registerQueueLatency(true);
persistentTopicConsumer.accept(topicThrowablePair.getPersistentTopicOpt());
} else {
registerQueueLatency(false);
exceptionConsumer.accept(topicThrowablePair.getThrowable());
}
decrementCount();
return topicThrowablePair;
}).exceptionally(e -> {
registerQueueLatency(false);
exceptionConsumer.accept(e.getCause());
decrementCount();
return TopicThrowablePair.withThrowable(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.concurrent.EventExecutor;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,24 +95,28 @@ public void recycle() {
public void updateConsumerStats(final TopicPartition topicPartition,
int entrySize,
final String groupId,
RequestStats statsLogger) {
RequestStats statsLogger,
EventExecutor executor) {
final int numMessages = EntryFormatter.parseNumMessages(records);

final StatsLogger statsLoggerForThisPartition = statsLogger.getStatsLoggerForTopicPartition(topicPartition);

statsLoggerForThisPartition.getCounter(CONSUME_MESSAGE_CONVERSIONS).addCount(conversionCount);
statsLoggerForThisPartition.getOpStatsLogger(CONSUME_MESSAGE_CONVERSIONS_TIME_NANOS)
.registerSuccessfulEvent(conversionTimeNanos, TimeUnit.NANOSECONDS);
final StatsLogger statsLoggerForThisGroup;
if (groupId != null) {
statsLoggerForThisGroup = statsLogger.getStatsLoggerForTopicPartitionAndGroup(topicPartition, groupId);
} else {
statsLoggerForThisGroup = statsLoggerForThisPartition;
}
statsLoggerForThisGroup.getCounter(BYTES_OUT).addCount(records.sizeInBytes());
statsLoggerForThisGroup.getCounter(MESSAGE_OUT).addCount(numMessages);
statsLoggerForThisGroup.getCounter(ENTRIES_OUT).addCount(entrySize);

final long conversionTimeNanosCopy = conversionTimeNanos;
final int conversionCountCopy = conversionCount;
int sizeInBytes = records.sizeInBytes();
executor.execute(() -> {
statsLoggerForThisPartition.getCounter(CONSUME_MESSAGE_CONVERSIONS).addCount(conversionCountCopy);
statsLoggerForThisPartition.getOpStatsLogger(CONSUME_MESSAGE_CONVERSIONS_TIME_NANOS)
.registerSuccessfulEvent(conversionTimeNanosCopy, TimeUnit.NANOSECONDS);
final StatsLogger statsLoggerForThisGroup;
if (groupId != null) {
statsLoggerForThisGroup = statsLogger.getStatsLoggerForTopicPartitionAndGroup(topicPartition, groupId);
} else {
statsLoggerForThisGroup = statsLoggerForThisPartition;
}
statsLoggerForThisGroup.getCounter(BYTES_OUT).addCount(sizeInBytes);
statsLoggerForThisGroup.getCounter(MESSAGE_OUT).addCount(numMessages);
statsLoggerForThisGroup.getCounter(ENTRIES_OUT).addCount(entrySize);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.concurrent.EventExecutor;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -81,21 +82,28 @@ public void recycle() {

public void updateProducerStats(final TopicPartition topicPartition,
final RequestStats requestStats,
final Producer producer) {
final Producer producer,
final EventExecutor executor) {
final int numBytes = encodedByteBuf.readableBytes();

producer.updateRates(numMessages, numBytes);
producer.getTopic().incrementPublishCount(numMessages, numBytes);

final StatsLogger statsLoggerForThisPartition = requestStats.getStatsLoggerForTopicPartition(topicPartition);
int numMessageCopy = numMessages;
int conversionCountCopy = conversionCount;
long conversionTimeNanosCopy = conversionTimeNanos;
executor.execute(() -> {
final StatsLogger statsLoggerForThisPartition =
requestStats.getStatsLoggerForTopicPartition(topicPartition);

statsLoggerForThisPartition.getCounter(BYTES_IN).addCount(numBytes);
statsLoggerForThisPartition.getCounter(MESSAGE_IN).addCount(numMessages);
statsLoggerForThisPartition.getCounter(PRODUCE_MESSAGE_CONVERSIONS).addCount(conversionCount);
statsLoggerForThisPartition.getOpStatsLogger(PRODUCE_MESSAGE_CONVERSIONS_TIME_NANOS)
.registerSuccessfulEvent(conversionTimeNanos, TimeUnit.NANOSECONDS);
statsLoggerForThisPartition.getCounter(BYTES_IN).addCount(numBytes);
statsLoggerForThisPartition.getCounter(MESSAGE_IN).addCount(numMessageCopy);
statsLoggerForThisPartition.getCounter(PRODUCE_MESSAGE_CONVERSIONS).addCount(conversionCountCopy);
statsLoggerForThisPartition.getOpStatsLogger(PRODUCE_MESSAGE_CONVERSIONS_TIME_NANOS)
.registerSuccessfulEvent(conversionTimeNanosCopy, TimeUnit.NANOSECONDS);

RequestStats.BATCH_COUNT_PER_MEMORY_RECORDS_INSTANCE.set(numMessages);
RequestStats.BATCH_COUNT_PER_MEMORY_RECORDS_INSTANCE.set(numMessageCopy);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.streamnative.pulsar.handlers.kop.storage;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutor;
import io.streamnative.pulsar.handlers.kop.KafkaTopicManager;
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
import java.util.Map;
Expand All @@ -35,6 +36,7 @@ public class AppendRecordsContext {
private Consumer<Integer> completeSendOperationForThrottling;
private Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap;
private ChannelHandlerContext ctx;
private EventExecutor eventExecutor;

// recycler and get for this object
public static AppendRecordsContext get(final KafkaTopicManager topicManager,
Expand All @@ -46,7 +48,8 @@ public static AppendRecordsContext get(final KafkaTopicManager topicManager,
startSendOperationForThrottling,
completeSendOperationForThrottling,
pendingTopicFuturesMap,
ctx);
ctx,
ctx.executor());
}

}
Loading

0 comments on commit 014340d

Please sign in to comment.