Skip to content

Commit

Permalink
[fix][test] Fix thread leaks in tests by closing executors properly (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Oct 24, 2023
1 parent 6518e4f commit cad4e75
Show file tree
Hide file tree
Showing 15 changed files with 101 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -234,6 +235,7 @@ public void channelValidationTest()
var channel = createChannel(pulsar);
int errorCnt = validateChannelStart(channel);
assertEquals(6, errorCnt);
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
Future startFuture = executor.submit(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Mockito.verify;

import com.google.common.collect.Lists;
import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
Expand Down Expand Up @@ -127,6 +128,8 @@ public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedExceptio
PulsarService pulsar = mock(PulsarService.class);
NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(1);
doAnswer(__ -> CompletableFuture.supplyAsync(() -> {
try {
// Delay 5 seconds to finish.
Expand All @@ -135,9 +138,10 @@ public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedExceptio
throw new RuntimeException(e);
}
return Lists.newArrayList("broker-1", "broker-2");
}, Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync();
}, executor)).when(registry).getAvailableBrokersAsync();
UnloadScheduler scheduler = new UnloadScheduler(pulsar, loadManagerExecutor, unloadManager, context,
channel, unloadStrategy, counter, reference);
@Cleanup("shutdownNow")
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch latch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -361,6 +362,7 @@ public void testGetTopicPoliciesWithRetry() throws Exception {
TopicPolicies initPolicy = TopicPolicies.builder()
.maxConsumerPerTopic(10)
.build();
@Cleanup("shutdownNow")
ScheduledExecutorService executors = Executors.newScheduledThreadPool(1);
executors.schedule(new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ public void testAsyncSendOrAckForSingleFuture() throws Exception {
int threadSize = 30;
String topicName = "subscription";
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
@Cleanup("shutdownNow")
ExecutorService executorService = Executors.newFixedThreadPool(threadSize);

//build producer/consumer
Expand Down Expand Up @@ -1451,6 +1452,7 @@ public void testPendingAckBatchMessageCommit() throws Exception {
public void testPendingAckReplayChangeStateError() throws InterruptedException, TimeoutException {
AtomicInteger atomicInteger = new AtomicInteger(1);
// Create Executor
@Cleanup("shutdownNow")
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
// Mock serviceConfiguration.
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -382,6 +383,7 @@ public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
int totalMessage = 200;
int threadSize = 5;
String topicName = "subscription";
@Cleanup("shutdownNow")
ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
conf.setBrokerDeduplicationEnabled(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ClientCnxTest extends MockedPulsarServiceBaseTest {
public static final String TENANT = "tnx";
public static final String NAMESPACE = TENANT + "/ns1";
public static String persistentTopic = "persistent://" + NAMESPACE + "/test";
ExecutorService executorService = Executors.newFixedThreadPool(20);
ExecutorService executorService;

@BeforeClass
@Override
Expand All @@ -54,13 +54,14 @@ protected void setup() throws Exception {
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE);
executorService = Executors.newFixedThreadPool(20);
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
this.executorService.shutdown();
this.executorService.shutdownNow();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,21 @@ public class ConnectionHandlerTest extends ProducerConsumerBase {
private static final Backoff BACKOFF = new BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
.setMandatoryStop(1, TimeUnit.SECONDS)
.setMax(3, TimeUnit.SECONDS).create();
private final ExecutorService executor = Executors.newFixedThreadPool(4);
private ExecutorService executor;

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
executor = Executors.newFixedThreadPool(4);
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
executor.shutdown();
executor.shutdownNow();
}

@Test(timeOut = 30000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import lombok.Cleanup;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;
Expand All @@ -37,6 +38,7 @@ public class RetryUtilTest {

@Test
public void testFailAndRetry() throws Exception {
@Cleanup("shutdownNow")
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
CompletableFuture<Boolean> callback = new CompletableFuture<>();
AtomicInteger atomicInteger = new AtomicInteger(0);
Expand All @@ -57,11 +59,11 @@ public void testFailAndRetry() throws Exception {
}, backoff, executor, callback);
assertTrue(callback.get());
assertEquals(atomicInteger.get(), 5);
executor.shutdownNow();
}

@Test
public void testFail() throws Exception {
@Cleanup("shutdownNow")
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
CompletableFuture<Boolean> callback = new CompletableFuture<>();
Backoff backoff = new BackoffBuilder()
Expand All @@ -79,6 +81,5 @@ public void testFail() throws Exception {
}
long time = System.currentTimeMillis() - start;
assertTrue(time >= 5000 - 2000, "Duration:" + time);
executor.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,7 @@ public void testCreateSchemaInParallel() throws Exception {
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));

final String topic = getTopicName(ns, "testCreateSchemaInParallel");
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(16);
List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = new ArrayList<>(16);
CountDownLatch latch = new CountDownLatch(16);
Expand Down Expand Up @@ -1365,7 +1366,6 @@ public void testCreateSchemaInParallel() throws Exception {
});
producers.clear();
producers2.clear();
executor.shutdownNow();
}

@EqualsAndHashCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@
@Test(groups = "websocket")
public class ProxyPublishConsumeClientSideEncryptionTest extends ProducerConsumerBase {
private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService executor;
private static final Charset charset = Charset.defaultCharset();

private ProxyServer proxyServer;
private WebSocketService service;

@BeforeClass
public void setup() throws Exception {
executor = Executors.newScheduledThreadPool(1);

conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);

super.internalSetup();
Expand All @@ -92,6 +94,7 @@ protected void cleanup() throws Exception {
if (proxyServer != null) {
proxyServer.stop();
}
executor.shutdownNow();
log.info("Finished Cleaning Up Test setup");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public void testWaitForAny() {

public void testSequencer() {
int concurrentNum = 1000;
@Cleanup("shutdownNow")
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(concurrentNum);
final FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
// normal case -- allowExceptionBreakChain=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Cleanup;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -41,15 +42,12 @@ public static Object[][] caDataProvider() {
public void testLoadCA(String path, int count) {
String caPath = Resources.getResource(path).getPath();

@Cleanup("shutdownNow")
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
try {
TrustManagerProxy trustManagerProxy =
new TrustManagerProxy(caPath, 120, scheduledExecutor);
X509Certificate[] x509Certificates = trustManagerProxy.getAcceptedIssuers();
assertNotNull(x509Certificates);
assertEquals(Arrays.stream(x509Certificates).count(), count);
} finally {
scheduledExecutor.shutdown();
}
TrustManagerProxy trustManagerProxy =
new TrustManagerProxy(caPath, 120, scheduledExecutor);
X509Certificate[] x509Certificates = trustManagerProxy.getAcceptedIssuers();
assertNotNull(x509Certificates);
assertEquals(Arrays.stream(x509Certificates).count(), count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,48 @@
*/
package org.apache.bookkeeper.mledger.offload.filesystem;

import java.io.File;
import java.nio.file.Files;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;

import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;

import java.io.File;
import java.nio.file.Files;
import java.util.Properties;
import java.util.concurrent.Executors;

public abstract class FileStoreTestBase {
protected FileSystemManagedLedgerOffloader fileSystemManagedLedgerOffloader;
protected OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
protected OrderedScheduler scheduler;
protected final String basePath = "pulsar";
private MiniDFSCluster hdfsCluster;
private String hdfsURI;
protected LedgerOffloaderStats offloaderStats;
private ScheduledExecutorService scheduledExecutorService;

@BeforeClass(alwaysRun = true)
public final void beforeClass() throws Exception {
init();
}

public void init() throws Exception {
scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
}

@AfterClass(alwaysRun = true)
public final void afterClass() {
cleanup();
}

public void cleanup() {
scheduler.shutdownNow();
}

@BeforeMethod(alwaysRun = true)
public void start() throws Exception {
Expand All @@ -51,7 +71,8 @@ public void start() throws Exception {

hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/";
Properties properties = new Properties();
this.offloaderStats = LedgerOffloaderStats.create(true, true, Executors.newScheduledThreadPool(1), 60);
scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.offloaderStats = LedgerOffloaderStats.create(true, true, scheduledExecutorService, 60);
fileSystemManagedLedgerOffloader = new FileSystemManagedLedgerOffloader(
OffloadPoliciesImpl.create(properties),
scheduler, hdfsURI, basePath, offloaderStats);
Expand All @@ -61,6 +82,7 @@ public void start() throws Exception {
public void tearDown() {
hdfsCluster.shutdown(true, true);
hdfsCluster.close();
scheduledExecutorService.shutdownNow();
}

public String getURI() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
package org.apache.bookkeeper.mledger.offload.filesystem.impl;


import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
Expand All @@ -35,18 +43,9 @@
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
private final PulsarMockBookKeeper bk;
private PulsarMockBookKeeper bk;
private String managedLedgerName = "public/default/persistent/testOffload";
private String topicName = TopicName.fromPersistenceNamingEncoding(managedLedgerName);
private String storagePath = createStoragePath(managedLedgerName);
Expand All @@ -55,7 +54,9 @@ public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
private final int numberOfEntries = 601;
private Map<String, String> map = new HashMap<>();

public FileSystemManagedLedgerOffloaderTest() throws Exception {
@Override
public void init() throws Exception {
super.init();
this.bk = new PulsarMockBookKeeper(scheduler);
this.toWrite = buildReadHandle();
map.put("ManagedLedgerName", managedLedgerName);
Expand Down
Loading

0 comments on commit cad4e75

Please sign in to comment.