Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
TransactionCoordinator: prevent thread leaks during broker shutdown (#…
Browse files Browse the repository at this point in the history
…813)

I noticed this thread leak during the executions of the tests

```
"Thread-3" #163 prio=5 os_prio=0 tid=0x00007f0c34011800 nid=0x603a sleeping[0x00007f0c0dea4000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelManager.lambda$new$0(TransactionMarkerChannelManager.java:148)
        at io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelManager$$Lambda$624/1475886210.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:748)
```
this patch fixes the problem

Co-authored-by: Enrico Olivelli <[email protected]>
  • Loading branch information
2 people authored and BewareMyPower committed Oct 20, 2021
1 parent 97c49eb commit 43d1180
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ public void shutdown() {
log.info("Shutting down transaction coordinator ...");
producerIdManager.shutdown();
txnManager.shutdown();
transactionMarkerChannelManager.close();
// TODO shutdown txn
log.info("Shutdown transaction coordinator complete.");
}
Expand All @@ -871,5 +872,4 @@ public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffs
}
return abortedTransactions;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class TransactionMarkerChannelManager {
private Map<InetSocketAddress, TxnMarkerQueue> markersQueuePerBroker = new ConcurrentHashMap<>();
private TxnMarkerQueue markersQueueForUnknownBroker = new TxnMarkerQueue(null);
private BlockingQueue<PendingCompleteTxn> txnLogAppendRetryQueue = new LinkedBlockingQueue<>();
private volatile boolean closed;

@AllArgsConstructor
private static class PendingCompleteTxn {
Expand Down Expand Up @@ -141,16 +142,18 @@ public TransactionMarkerChannelManager(KafkaServiceConfiguration kafkaConfig,
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new TransactionMarkerChannelInitializer(kafkaConfig, enableTls));

new Thread(() -> {
while (true) {
Thread thread = new Thread(() -> {
while (!closed) {
drainQueuedTransactionMarkers();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}, "kop-transaction-channel-manager");
thread.setDaemon(true);
thread.start();
}

public CompletableFuture<TransactionMarkerChannelHandler> getChannel(InetSocketAddress socketAddress) {
Expand Down Expand Up @@ -410,4 +413,8 @@ private void drainQueuedTransactionMarkers() {
}
}

public void close() {
this.closed = true;
}

}

0 comments on commit 43d1180

Please sign in to comment.