diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java index e5ab0f80a832c..014e2bf642a4d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/NodeJoinLeftIT.java @@ -33,13 +33,9 @@ package org.opensearch.cluster.coordination; import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.layout.PatternLayout; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.metadata.IndexMetadata; @@ -53,6 +49,7 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import org.opensearch.test.TestLogsAppender; import org.opensearch.test.store.MockFSIndexStore; import org.opensearch.test.transport.MockTransportService; import org.opensearch.test.transport.StubbableTransport; @@ -64,12 +61,11 @@ import org.junit.After; import org.junit.Before; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Pattern; import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME; import static org.hamcrest.Matchers.is; @@ -81,7 +77,7 @@ @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class NodeJoinLeftIT extends OpenSearchIntegTestCase { - private TestAppender testAppender; + private TestLogsAppender testLogsAppender; private String clusterManager; private String redNodeName; private LoggerContext loggerContext; @@ -108,11 +104,13 @@ protected void beforeIndexDeletion() throws Exception { @Before public void setUp() throws Exception { super.setUp(); - testAppender = new TestAppender(); + // Add any other specific messages you want to capture + List messagesToCapture = Arrays.asList("failed to join", "IllegalStateException"); + testLogsAppender = new TestLogsAppender(messagesToCapture); loggerContext = (LoggerContext) LogManager.getContext(false); Configuration config = loggerContext.getConfiguration(); LoggerConfig loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName()); - loggerConfig.addAppender(testAppender, null, null); + loggerConfig.addAppender(testLogsAppender, null, null); loggerContext.updateLoggers(); String indexName = "test"; @@ -148,10 +146,11 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { + testLogsAppender.clearCapturedLogs(); loggerContext = (LoggerContext) LogManager.getContext(false); Configuration config = loggerContext.getConfiguration(); LoggerConfig loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName()); - loggerConfig.removeAppender(testAppender.getName()); + loggerConfig.removeAppender(testLogsAppender.getName()); loggerContext.updateLoggers(); super.tearDown(); } @@ -190,9 +189,9 @@ public void testClusterStabilityWhenJoinRequestHappensDuringNodeLeftTask() throw ); redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, simulatedFailureBehaviour); - // Loop runs 10 times to ensure race condition gets reproduced - for (int i = 0; i < 10; i++) { - // Fail followerchecker by force to trigger node disconnect and node left + // Loop runs 5 times to ensure race condition gets reproduced + testLogsAppender.clearCapturedLogs(); + for (int i = 0; i < 5; i++) { logger.info("--> simulating followerchecker failure to trigger node-left"); succeedFollowerChecker.set(false); ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); @@ -214,12 +213,14 @@ public void testClusterStabilityWhenJoinRequestHappensDuringNodeLeftTask() throw ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); assertThat(response.isTimedOut(), is(false)); - // assert that the right exception message showed up in logs - assertTrue( - "Expected IllegalStateException was not logged", - testAppender.containsExceptionMessage("IllegalStateException[cannot make a new connection as disconnect to node") - ); - + // assert that join requests fail with the right exception + boolean logFound = testLogsAppender.waitForLog("failed to join", 30, TimeUnit.SECONDS) + && testLogsAppender.waitForLog( + "IllegalStateException[cannot make a new connection as disconnect to node", + 30, + TimeUnit.SECONDS + ); + assertTrue("Expected log was not found within the timeout period", logFound); } public void testClusterStabilityWhenDisconnectDuringSlowNodeLeftTask() throws Exception { @@ -259,8 +260,9 @@ public void testClusterStabilityWhenDisconnectDuringSlowNodeLeftTask() throws Ex ); redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, simulatedFailureBehaviour); - // Loop runs 10 times to ensure race condition gets reproduced - for (int i = 0; i < 10; i++) { + // Loop runs 5 times to ensure race condition gets reproduced + testLogsAppender.clearCapturedLogs(); + for (int i = 0; i < 5; i++) { // Fail followerchecker by force to trigger node disconnect and node left logger.info("--> simulating followerchecker failure to trigger node-left"); succeedFollowerChecker.set(false); @@ -291,11 +293,15 @@ public void testClusterStabilityWhenDisconnectDuringSlowNodeLeftTask() throws Ex ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); assertThat(response.isTimedOut(), is(false)); - // assert that the right exception message showed up in logs - assertTrue( - "Expected IllegalStateException was not logged", - testAppender.containsExceptionMessage("IllegalStateException[cannot make a new connection as disconnect to node") + // assert that join requests fail with the right exception + boolean logFound = testLogsAppender.waitForLog("failed to join", 30, TimeUnit.SECONDS); + assertTrue("Expected log was not found within the timeout period", logFound); + logFound = testLogsAppender.waitForLog( + "IllegalStateException[cannot make a new connection as disconnect to node", + 30, + TimeUnit.SECONDS ); + assertTrue("Expected log was not found within the timeout period", logFound); } public void testRestartDataNode() throws Exception { @@ -346,29 +352,4 @@ public void messageReceived( handler.messageReceived(request, channel, task); } } - - private static class TestAppender extends AbstractAppender { - private final List logs = new ArrayList<>(); - - TestAppender() { - super("TestAppender", null, PatternLayout.createDefaultLayout(), false, Property.EMPTY_ARRAY); - start(); - } - - @Override - public void append(LogEvent event) { - logs.add(event.getMessage().getFormattedMessage()); - if (event.getThrown() != null) { - logs.add(event.getThrown().toString()); - for (StackTraceElement element : event.getThrown().getStackTrace()) { - logs.add(element.toString()); - } - } - } - - boolean containsExceptionMessage(String exceptionMessage) { - Pattern pattern = Pattern.compile(Pattern.quote(exceptionMessage), Pattern.CASE_INSENSITIVE); - return logs.stream().anyMatch(log -> pattern.matcher(log).find()); - } - } } diff --git a/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java index cbae19dab01fa..2958ac50c6a82 100644 --- a/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java @@ -35,6 +35,9 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.Version; import org.opensearch.action.support.PlainActionFuture; @@ -53,9 +56,11 @@ import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.TestLogsAppender; import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ClusterConnectionManager; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.ConnectionProfile; import org.opensearch.transport.Transport; @@ -69,6 +74,7 @@ import org.junit.Before; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -94,6 +100,8 @@ public class NodeConnectionsServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; private TransportService transportService; private Map> nodeConnectionBlocks; + private TestLogsAppender testLogsAppender; + private LoggerContext loggerContext; private List generateNodes() { List nodes = new ArrayList<>(); @@ -516,7 +524,7 @@ public void testConnectionCheckerRetriesIfPendingDisconnection() throws Interrup // setup the connections final DiscoveryNode node = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); - ; + final DiscoveryNodes nodes = DiscoveryNodes.builder().add(node).build(); final AtomicBoolean connectionCompleted = new AtomicBoolean(); @@ -526,13 +534,15 @@ public void testConnectionCheckerRetriesIfPendingDisconnection() throws Interrup // now trigger a disconnect, and then set pending disconnections to true to fail any new connections final long maxDisconnectionTime = 1000; - final long disconnectionTime = 100; - deterministicTaskQueue.scheduleAt(disconnectionTime, new Runnable() { + deterministicTaskQueue.scheduleNow(new Runnable() { @Override public void run() { transportService.disconnectFromNode(node); logger.info("--> setting pending disconnections to fail next connection attempts"); service.setPendingDisconnections(new HashSet<>(Collections.singleton(node))); + // we reset the connection count during the first disconnection + // we also clear the captured logs as we want to assert for exceptions that show up after this + testLogsAppender.clearCapturedLogs(); transportService.resetConnectToNodeCallCount(); } @@ -541,20 +551,39 @@ public String toString() { return "scheduled disconnection of " + node; } }); + final long maxReconnectionTime = 2000; + final int expectedReconnectionAttempts = 5; - // ensure the disconnect task completes, give extra time also for connection checker tasks - runTasksUntil(deterministicTaskQueue, maxDisconnectionTime); - - // verify that connectionchecker is trying to call connectToNode multiple times + // ensure the disconnect task completes, and run for additional time to check for reconnections + // exit early if we see enough reconnection attempts logger.info("--> verifying connectionchecker is trying to reconnect"); + runTasksUntilExpectedReconnectionAttempts( + deterministicTaskQueue, + maxDisconnectionTime + maxReconnectionTime, + transportService, + expectedReconnectionAttempts + ); + + // assert that we saw at least the required number of reconnection attempts, and the exceptions that showed up are as expected logger.info("--> number of reconnection attempts: {}", transportService.getConnectToNodeCallCount()); - assertThat("ConnectToNode should be called multiple times", transportService.getConnectToNodeCallCount(), greaterThan(5)); + assertThat( + "Did not see enough reconnection attempts from connection checker", + transportService.getConnectToNodeCallCount(), + greaterThan(expectedReconnectionAttempts) + ); + boolean logFound = testLogsAppender.waitForLog("failed to connect", 1, TimeUnit.SECONDS) + && testLogsAppender.waitForLog( + "IllegalStateException: cannot make a new connection as disconnect to node", + 1, + TimeUnit.SECONDS + ); + assertTrue("Expected log for reconnection failure was not found in the required time period", logFound); assertFalse("connected to " + node, transportService.nodeConnected(node)); // clear the pending disconnections and ensure the connection gets re-established automatically by connectionchecker logger.info("--> clearing pending disconnections to allow connections to re-establish"); service.clearPendingDisconnections(); - runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + 2 * reconnectIntervalMillis); + runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + maxReconnectionTime + 2 * reconnectIntervalMillis); assertConnectedExactlyToNodes(transportService, nodes); } @@ -569,6 +598,24 @@ private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long e deterministicTaskQueue.runAllRunnableTasks(); } + private void runTasksUntilExpectedReconnectionAttempts( + DeterministicTaskQueue deterministicTaskQueue, + long endTimeMillis, + TestTransportService transportService, + int expectedReconnectionAttempts + ) { + // break the loop if we timeout or if we have enough reconnection attempts + while ((deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) + && (transportService.getConnectToNodeCallCount() <= expectedReconnectionAttempts)) { + if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) { + deterministicTaskQueue.runRandomTask(); + } else if (deterministicTaskQueue.hasDeferredTasks()) { + deterministicTaskQueue.advanceTime(); + } + } + deterministicTaskQueue.runAllRunnableTasks(); + } + private void ensureConnections(NodeConnectionsService service) { final PlainActionFuture future = new PlainActionFuture<>(); service.ensureConnections(() -> future.onResponse(null)); @@ -594,6 +641,16 @@ private void assertConnected(TransportService transportService, Iterable messagesToCapture = Arrays.asList("failed to connect", "IllegalStateException"); + testLogsAppender = new TestLogsAppender(messagesToCapture); + loggerContext = (LoggerContext) LogManager.getContext(false); + Configuration config = loggerContext.getConfiguration(); + LoggerConfig loggerConfig = config.getLoggerConfig(NodeConnectionsService.class.getName()); + loggerConfig.addAppender(testLogsAppender, null, null); + loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName()); + loggerConfig.addAppender(testLogsAppender, null, null); + loggerContext.updateLoggers(); ThreadPool threadPool = new TestThreadPool(getClass().getName()); this.threadPool = threadPool; nodeConnectionBlocks = newConcurrentMap(); @@ -605,6 +662,14 @@ public void setUp() throws Exception { @Override @After public void tearDown() throws Exception { + testLogsAppender.clearCapturedLogs(); + loggerContext = (LoggerContext) LogManager.getContext(false); + Configuration config = loggerContext.getConfiguration(); + LoggerConfig loggerConfig = config.getLoggerConfig(NodeConnectionsService.class.getName()); + loggerConfig.removeAppender(testLogsAppender.getName()); + loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName()); + loggerConfig.removeAppender(testLogsAppender.getName()); + loggerContext.updateLoggers(); transportService.stop(); ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); threadPool = null; diff --git a/test/framework/src/main/java/org/opensearch/test/TestLogsAppender.java b/test/framework/src/main/java/org/opensearch/test/TestLogsAppender.java new file mode 100644 index 0000000000000..030f399a5bcc0 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/test/TestLogsAppender.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test; + +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.layout.PatternLayout; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Test logs appender that provides functionality to extract specific logs/exception messages and wait for it to show up + * @opensearch.internal + */ +public class TestLogsAppender extends AbstractAppender { + private final List capturedLogs = new ArrayList<>(); + private final List messagesToCapture; + + public TestLogsAppender(List messagesToCapture) { + super("TestAppender", null, PatternLayout.createDefaultLayout(), false, Property.EMPTY_ARRAY); + this.messagesToCapture = messagesToCapture; + start(); + } + + @Override + public void append(LogEvent event) { + if (shouldCaptureMessage(event.getMessage().getFormattedMessage())) capturedLogs.add(event.getMessage().getFormattedMessage()); + if (event.getThrown() != null) { + if (shouldCaptureMessage(event.getThrown().toString())) capturedLogs.add(event.getThrown().toString()); + for (StackTraceElement element : event.getThrown().getStackTrace()) + if (shouldCaptureMessage(element.toString())) capturedLogs.add(element.toString()); + } + } + + public boolean shouldCaptureMessage(String log) { + return messagesToCapture.stream().anyMatch(log::contains); + } + + public List getCapturedLogs() { + return new ArrayList<>(capturedLogs); + } + + public boolean waitForLog(String expectedLog, long timeout, TimeUnit unit) { + long startTime = System.currentTimeMillis(); + long timeoutInMillis = unit.toMillis(timeout); + + while (System.currentTimeMillis() - startTime < timeoutInMillis) { + if (capturedLogs.stream().anyMatch(log -> log.contains(expectedLog))) { + return true; + } + try { + Thread.sleep(100); // Wait for 100ms before checking again + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + return false; + } + + // Clear captured logs + public void clearCapturedLogs() { + capturedLogs.clear(); + } +}