Skip to content

Commit

Permalink
[fix][test] Fix thread leaks in Managed Ledger tests and remove dupli…
Browse files Browse the repository at this point in the history
…cate shutdown code (apache#21426)
  • Loading branch information
lhotari authored Oct 24, 2023
1 parent c702be1 commit c6704df
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -535,13 +536,12 @@ public CompletableFuture<Void> shutdownAsync() throws ManagedLedgerException {
int numLedgers = ledgerNames.size();
log.info("Closing {} ledgers", numLedgers);
for (String ledgerName : ledgerNames) {
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
CompletableFuture<ManagedLedgerImpl> ledgerFuture = ledgers.remove(ledgerName);
if (ledgerFuture == null) {
future.complete(null);
continue;
}
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
ledgerFuture.whenCompleteAsync((managedLedger, throwable) -> {
if (throwable != null || managedLedger == null) {
future.complete(null);
Expand Down Expand Up @@ -606,68 +606,20 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}
}));
entryCacheManager.clear();
return FutureUtil.waitForAll(futures).thenAccept(__ -> {
return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
scheduledExecutor.shutdown();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
});
}

@Override
public void shutdown() throws InterruptedException, ManagedLedgerException {
if (closed) {
throw new ManagedLedgerException.ManagedLedgerFactoryClosedException();
try {
shutdownAsync().get();
} catch (ExecutionException e) {
throw getManagedLedgerException(e.getCause());
}
closed = true;

statsTask.cancel(true);
flushCursorsTask.cancel(true);
cacheEvictionExecutor.shutdownNow();

// take a snapshot of ledgers currently in the map to prevent race conditions
List<CompletableFuture<ManagedLedgerImpl>> ledgers = new ArrayList<>(this.ledgers.values());
int numLedgers = ledgers.size();
final CountDownLatch latch = new CountDownLatch(numLedgers);
log.info("Closing {} ledgers", numLedgers);

for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : ledgers) {
ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
if (ledger == null) {
latch.countDown();
continue;
}

ledger.asyncClose(new AsyncCallbacks.CloseCallback() {
@Override
public void closeComplete(Object ctx) {
latch.countDown();
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Got exception when closing managed ledger: {}", ledger.getName(), exception);
latch.countDown();
}
}, null);
}

latch.await();
log.info("{} ledgers closed", numLedgers);

if (isBookkeeperManaged) {
try {
BookKeeper bookkeeper = bookkeeperFactory.get();
if (bookkeeper != null) {
bookkeeper.close();
}
} catch (BKException e) {
throw new ManagedLedgerException(e);
}
}

scheduledExecutor.shutdownNow();

entryCacheManager.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final At
bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
} else {
deleteLedgerInfo.hasCalled = true;
new Thread(() -> {
cachedExecutor.submit(() -> {
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
bkc.asyncDeleteLedger(ledgerId, cb, ctx);
}).start();
});
}
return null;
}).when(spyBookKeeper).asyncDeleteLedger(any(long.class), any(AsyncCallback.DeleteCallback.class), any());
Expand All @@ -208,6 +208,7 @@ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final At
public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
BookKeeper spyBookKeeper = spy(bkc);
@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);

Expand Down Expand Up @@ -3854,6 +3855,7 @@ public void testCancellationOfScheduledTasks() throws Exception {
public void testInactiveLedgerRollOver() throws Exception {
int inactiveLedgerRollOverTimeMs = 5;
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -3885,11 +3887,11 @@ public void testInactiveLedgerRollOver() throws Exception {
List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
assertEquals(ledgers.size(), totalAddEntries);
ledger.close();
factory.shutdown();
}

@Test
public void testOffloadTaskCancelled() throws Exception {
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ protected void startZKCluster() throws Exception {
zkc = zkUtil.getZooKeeperClient();
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
MetadataStoreConfig.builder().build()));
MetadataStoreConfig.builder()
.metadataStoreName("metastore-" + getClass().getSimpleName())
.build()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand Down Expand Up @@ -70,7 +71,8 @@ public MockedBookKeeperTestCase(int numBookies) {
public final void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create("memory:local", MetadataStoreConfig.builder().build()));
MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().metadataStoreName("metastore-" + method.getName()).build()));

try {
// start bookkeeper service
Expand Down Expand Up @@ -102,7 +104,11 @@ public final void tearDown(Method method) {
}
try {
LOG.info("@@@@@@@@@ stopping " + method);
factory.shutdownAsync().get(10, TimeUnit.SECONDS);
try {
factory.shutdownAsync().get(10, TimeUnit.SECONDS);
} catch (ManagedLedgerException.ManagedLedgerFactoryClosedException e) {
// ignore
}
factory = null;
stopBookKeeper();
metadataStore.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,8 @@ private void initializeCommonPulsarServices(SpyConfig spyConfig) {
} else {
try {
MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local",
MetadataStoreConfig.builder().build());
MetadataStoreConfig.builder()
.metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
registerCloseable(() -> {
store.close();
resetSpyOrMock(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
protected abstract CompletableFuture<Boolean> existsFromStore(String path);

protected AbstractMetadataStore(String metadataStoreName) {
this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory(metadataStoreName));
this.executor = new ScheduledThreadPoolExecutor(1,
new DefaultThreadFactory(
StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName()));
registerListener(this);

this.childrenCache = Caffeine.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ protected void startZKCluster() throws Exception {
zkc = zkUtil.getZooKeeperClient();
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
MetadataStoreConfig.builder().build()));
MetadataStoreConfig.builder().metadataStoreName("metastore-" + getClass().getSimpleName()).build()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public MockedBookKeeperTestCase(int numBookies) {
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().build()));
MetadataStoreConfig.builder()
.metadataStoreName("metastore-" + method.getName())
.build()));
try {
// start bookkeeper service
startBookKeeper();
Expand Down

0 comments on commit c6704df

Please sign in to comment.