diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 03605bf6e8519f..40e5411d7773be 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -535,13 +536,12 @@ public CompletableFuture shutdownAsync() throws ManagedLedgerException { int numLedgers = ledgerNames.size(); log.info("Closing {} ledgers", numLedgers); for (String ledgerName : ledgerNames) { - CompletableFuture future = new CompletableFuture<>(); - futures.add(future); CompletableFuture ledgerFuture = ledgers.remove(ledgerName); if (ledgerFuture == null) { - future.complete(null); continue; } + CompletableFuture future = new CompletableFuture<>(); + futures.add(future); ledgerFuture.whenCompleteAsync((managedLedger, throwable) -> { if (throwable != null || managedLedger == null) { future.complete(null); @@ -606,68 +606,20 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { })); } })); - entryCacheManager.clear(); - return FutureUtil.waitForAll(futures).thenAccept(__ -> { + return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> { //wait for tasks in scheduledExecutor executed. - scheduledExecutor.shutdown(); + scheduledExecutor.shutdownNow(); + entryCacheManager.clear(); }); } @Override public void shutdown() throws InterruptedException, ManagedLedgerException { - if (closed) { - throw new ManagedLedgerException.ManagedLedgerFactoryClosedException(); + try { + shutdownAsync().get(); + } catch (ExecutionException e) { + throw getManagedLedgerException(e.getCause()); } - closed = true; - - statsTask.cancel(true); - flushCursorsTask.cancel(true); - cacheEvictionExecutor.shutdownNow(); - - // take a snapshot of ledgers currently in the map to prevent race conditions - List> ledgers = new ArrayList<>(this.ledgers.values()); - int numLedgers = ledgers.size(); - final CountDownLatch latch = new CountDownLatch(numLedgers); - log.info("Closing {} ledgers", numLedgers); - - for (CompletableFuture ledgerFuture : ledgers) { - ManagedLedgerImpl ledger = ledgerFuture.getNow(null); - if (ledger == null) { - latch.countDown(); - continue; - } - - ledger.asyncClose(new AsyncCallbacks.CloseCallback() { - @Override - public void closeComplete(Object ctx) { - latch.countDown(); - } - - @Override - public void closeFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Got exception when closing managed ledger: {}", ledger.getName(), exception); - latch.countDown(); - } - }, null); - } - - latch.await(); - log.info("{} ledgers closed", numLedgers); - - if (isBookkeeperManaged) { - try { - BookKeeper bookkeeper = bookkeeperFactory.get(); - if (bookkeeper != null) { - bookkeeper.close(); - } - } catch (BKException e) { - throw new ManagedLedgerException(e); - } - } - - scheduledExecutor.shutdownNow(); - - entryCacheManager.clear(); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index cd61e00ccaa8ec..5bd6c299d99852 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -188,10 +188,10 @@ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final At bkc.asyncDeleteLedger(ledgerId, originalCb, ctx); } else { deleteLedgerInfo.hasCalled = true; - new Thread(() -> { + cachedExecutor.submit(() -> { Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get); bkc.asyncDeleteLedger(ledgerId, cb, ctx); - }).start(); + }); } return null; }).when(spyBookKeeper).asyncDeleteLedger(any(long.class), any(AsyncCallback.DeleteCallback.class), any()); @@ -208,6 +208,7 @@ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final At public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception { String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut"; BookKeeper spyBookKeeper = spy(bkc); + @Cleanup("shutdown") ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper); ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName); @@ -3854,6 +3855,7 @@ public void testCancellationOfScheduledTasks() throws Exception { public void testInactiveLedgerRollOver() throws Exception { int inactiveLedgerRollOverTimeMs = 5; ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); + @Cleanup("shutdown") ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, TimeUnit.MILLISECONDS); @@ -3885,11 +3887,11 @@ public void testInactiveLedgerRollOver() throws Exception { List ledgers = ledger.getLedgersInfoAsList(); assertEquals(ledgers.size(), totalAddEntries); ledger.close(); - factory.shutdown(); } @Test public void testOffloadTaskCancelled() throws Exception { + @Cleanup("shutdown") ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(2); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 80bb6256591bc1..0ddd04ebc4830b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -240,7 +240,9 @@ protected void startZKCluster() throws Exception { zkc = zkUtil.getZooKeeperClient(); metadataStore = new FaultInjectionMetadataStore( MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(), - MetadataStoreConfig.builder().build())); + MetadataStoreConfig.builder() + .metadataStoreName("metastore-" + getClass().getSimpleName()) + .build())); } /** diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index e2101268b09e69..645563eb78c4d8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -26,6 +26,7 @@ import lombok.SneakyThrows; import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -70,7 +71,8 @@ public MockedBookKeeperTestCase(int numBookies) { public final void setUp(Method method) throws Exception { LOG.info(">>>>>> starting {}", method); metadataStore = new FaultInjectionMetadataStore( - MetadataStoreExtended.create("memory:local", MetadataStoreConfig.builder().build())); + MetadataStoreExtended.create("memory:local", + MetadataStoreConfig.builder().metadataStoreName("metastore-" + method.getName()).build())); try { // start bookkeeper service @@ -102,7 +104,11 @@ public final void tearDown(Method method) { } try { LOG.info("@@@@@@@@@ stopping " + method); - factory.shutdownAsync().get(10, TimeUnit.SECONDS); + try { + factory.shutdownAsync().get(10, TimeUnit.SECONDS); + } catch (ManagedLedgerException.ManagedLedgerFactoryClosedException e) { + // ignore + } factory = null; stopBookKeeper(); metadataStore.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index c927a2e61d85e8..2e28ea8e70e10c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -589,7 +589,8 @@ private void initializeCommonPulsarServices(SpyConfig spyConfig) { } else { try { MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local", - MetadataStoreConfig.builder().build()); + MetadataStoreConfig.builder() + .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build()); registerCloseable(() -> { store.close(); resetSpyOrMock(store); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 7bc881c1254217..9ba2588a07cf0a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -89,7 +89,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected abstract CompletableFuture existsFromStore(String path); protected AbstractMetadataStore(String metadataStoreName) { - this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory(metadataStoreName)); + this.executor = new ScheduledThreadPoolExecutor(1, + new DefaultThreadFactory( + StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); registerListener(this); this.childrenCache = Caffeine.newBuilder() diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java index c681a1f0764eef..9a8e3ef5a2d4ff 100644 --- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java @@ -238,7 +238,7 @@ protected void startZKCluster() throws Exception { zkc = zkUtil.getZooKeeperClient(); metadataStore = new FaultInjectionMetadataStore( MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(), - MetadataStoreConfig.builder().build())); + MetadataStoreConfig.builder().metadataStoreName("metastore-" + getClass().getSimpleName()).build())); } /** diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java index e0b10ca0280d20..ac5aa3bd8927ef 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java @@ -71,7 +71,9 @@ public MockedBookKeeperTestCase(int numBookies) { public void setUp(Method method) throws Exception { LOG.info(">>>>>> starting {}", method); metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory:local", - MetadataStoreConfig.builder().build())); + MetadataStoreConfig.builder() + .metadataStoreName("metastore-" + method.getName()) + .build())); try { // start bookkeeper service startBookKeeper();