Skip to content

Commit

Permalink
ARTEMIS-4436 Artemis is logging warnings during clean shutdown of server
Browse files Browse the repository at this point in the history
in cluster.

When we know that a node leaves a clustercleanly we shouldn't log WARN
messages about it.

Signed-off-by: Emmanuel Hugonnet <[email protected]>
  • Loading branch information
ehsavoie committed Jan 4, 2024
1 parent 7456e64 commit a1f3db9
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,7 @@ public interface ActiveMQClientLogger {

@LogMessage(id = 214035, value = "Couldn't finish the client globalFlowControlThreadPool in less than 10 seconds, interrupting it now", level = LogMessage.Level.WARN)
void unableToProcessGlobalFlowControlThreadPoolIn10Sec();

@LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO)
void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,15 +483,15 @@ public void causeExit() {
}

private void interruptConnectAndCloseAllSessions(boolean close) {
//release all threads waiting for topology
latchFinalTopology.countDown();

clientProtocolManager.stop();

synchronized (createSessionLock) {
closeCleanSessions(close);
closed = true;
}

//release all threads waiting for topology
latchFinalTopology.countDown();
}

/**
Expand Down Expand Up @@ -544,7 +544,9 @@ public boolean waitForTopology(long timeout, TimeUnit unit) {
return latchFinalTopology.await(timeout, unit) && topologyReady;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
ActiveMQClientLogger.LOGGER.unableToReceiveClusterTopology(e);
if (!isClosed()) {
ActiveMQClientLogger.LOGGER.unableToReceiveClusterTopology(e);
}
return false;
}
}
Expand Down Expand Up @@ -1547,7 +1549,7 @@ public void nodeDisconnected(RemotingConnection conn, String nodeID, DisconnectR
serverLocator, nodeID, reason, new Exception("trace"));
}

serverLocator.notifyNodeDown(System.currentTimeMillis(), nodeID);
serverLocator.notifyNodeDown(System.currentTimeMillis(), nodeID, true);

if (reason.isRedirect()) {
if (serverLocator.isHA()) {
Expand Down Expand Up @@ -1596,8 +1598,8 @@ public void notifyNodeUp(long uniqueEventID,
}

@Override
public void notifyNodeDown(long eventTime, String nodeID) {
serverLocator.notifyNodeDown(eventTime, nodeID);
public void notifyNodeDown(long eventTime, String nodeID, boolean disconnect) {
serverLocator.notifyNodeDown(eventTime, nodeID, disconnect);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ private void doClose(final boolean sendClose) {
* Look for callers of this method!
*/
@Override
public void notifyNodeDown(final long eventTime, final String nodeID) {
public void notifyNodeDown(final long eventTime, final String nodeID, boolean disconnect) {

if (!ha) {
// there's no topology here
Expand All @@ -1511,7 +1511,7 @@ public void notifyNodeDown(final long eventTime, final String nodeID) {
logger.trace("nodeDown {} nodeID={} as being down", this, nodeID, new Exception("trace"));
}

topology.removeMember(eventTime, nodeID);
topology.removeMember(eventTime, nodeID, disconnect);

if (clusterConnection) {
updateArraysAndPairs(eventTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ void notifyNodeUp(long uniqueEventID,
* @param uniqueEventID 0 means get the previous ID +1
* @param nodeID
*/
void notifyNodeDown(long uniqueEventID, String nodeID);
default void notifyNodeDown(long uniqueEventID, String nodeID) {
notifyNodeDown(uniqueEventID, nodeID, false);
}
void notifyNodeDown(long uniqueEventID, String nodeID, boolean disconnect);

ServerLocatorInternal setClusterConnection(boolean clusterConnection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ private ArrayList<ClusterTopologyListener> copyListeners() {
return listenersCopy;
}

boolean removeMember(final long uniqueEventID, final String nodeId) {
boolean removeMember(final long uniqueEventID, final String nodeId, final boolean disconnect) {
TopologyMemberImpl member;


if (manager != null && !manager.removeMember(uniqueEventID, nodeId)) {
if (manager != null && !manager.removeMember(uniqueEventID, nodeId, disconnect)) {
logger.debug("TopologyManager rejected the update towards {}", nodeId);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

public interface TopologyManager {
boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImpl memberInput);
boolean removeMember(long uniqueEventID, String nodeId);
boolean removeMember(long uniqueEventID, String nodeId, boolean disconnect);
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ protected void notifyTopologyChange(final ClusterTopologyChangeMessage topMessag
logger.debug("Notifying {} going down", topMessage.getNodeID());

if (topologyResponseHandler != null) {
topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID());
topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID(), topMessage.isExit());
}
} else {
Pair<TransportConfiguration, TransportConfiguration> transportConfig = topMessage.getPair();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;

public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {

Expand Down Expand Up @@ -198,8 +199,10 @@ public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
destroyed = true;
}

if (!(me instanceof ActiveMQRemoteDisconnectException) && !(me instanceof ActiveMQRoutingException)) {
if (!(me instanceof ActiveMQRemoteDisconnectException) && !(me instanceof ActiveMQRoutingException) && !(me instanceof ActiveMQDisconnectedException)) {
ActiveMQClientLogger.LOGGER.connectionFailureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType());
} else if (me instanceof ActiveMQDisconnectedException) {
ActiveMQClientLogger.LOGGER.connectionClosureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType());
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ void notifyNodeUp(long uniqueEventID,
boolean isLast);

// This is sent when any node on the cluster topology is going down
void notifyNodeDown(long eventTime, String nodeID);
void notifyNodeDown(long eventTime, String nodeID, boolean disconnect);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1602,4 +1602,10 @@ void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,

@LogMessage(id = 224133, value = "{} orphaned page transactions have been removed", level = LogMessage.Level.INFO)
void cleaningOrphanedTXCleanup(long numberOfPageTx);

@LogMessage(id = 224134, value = "Connection closed with failedOver={}", level = LogMessage.Level.INFO)
void bridgeConnectionClosed(Boolean failedOver);

@LogMessage(id = 224135, value = "nodeID {} is closing. Topology update ignored", level = LogMessage.Level.INFO)
void nodeLeavingCluster(String nodeID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;

/**
* A Core BridgeImpl
Expand Down Expand Up @@ -661,7 +662,11 @@ public void connectionFailed(final ActiveMQException me, boolean failedOver) {
@Override
public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
if (server.isStarted()) {
ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(failedOver);
if (me instanceof ActiveMQDisconnectedException) {
ActiveMQServerLogger.LOGGER.bridgeConnectionClosed(failedOver);
} else {
ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(failedOver);
}
}

synchronized (connectionGuard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,13 @@ public boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImp
* @return
*/
@Override
public boolean removeMember(final long uniqueEventID, final String nodeId) {
public boolean removeMember(final long uniqueEventID, final String nodeId, final boolean disconnect) {
if (nodeId.equals(nodeManager.getNodeId().toString())) {
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId);
if (!disconnect) {
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId);
} else {
ActiveMQServerLogger.LOGGER.nodeLeavingCluster(nodeId);
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.warnings;

import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterCleanNodeShutdownTest extends ClusterTestBase {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Test
public void testNoWarningErrorsDuringRestartingNodesInCluster() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());

setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

startServers(0, 1);
Wait.assertTrue(() -> {
getServer(0).getClusterManager().getClusterController().awaitConnectionToReplicationCluster();
return true;
}, 2000L);
Wait.assertTrue(() -> {
getServer(1).getClusterManager().getClusterController().awaitConnectionToReplicationCluster();
return true;
}, 2000L);

logger.debug("server 0 = {}", getServer(0).getNodeID());
logger.debug("server 1 = {}", getServer(1).getNodeID());

setupSessionFactory(0, isNetty(), 15);
setupSessionFactory(1, isNetty());

// now create the 2 queues and make sure they are durable
createQueue(0, "queues.testaddress", "queue10", null, true);
createQueue(1, "queues.testaddress", "queue10", null, true);

addConsumer(0, 0, "queue10", null);

waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);

waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);

sendInRange(1, "queues.testaddress", 0, 10, true, null);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(true)) {
logger.debug("*****************************************************************************");
stopServers(0);
// Waiting some time after stopped
Wait.assertTrue(() -> !getServer(0).isStarted() && !getServer(0).isActive(), 2000L);
logger.debug("*****************************************************************************");
Assert.assertFalse("Connection failure detected for an expected DISCONNECT event", loggerHandler.findText("AMQ212037", " [code=DISCONNECTED]"));
Assert.assertFalse("WARN found", loggerHandler.hasLevel(AssertionLoggerHandler.LogLevel.WARN));
}
startServers(0);

waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);

waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);

sendInRange(1, "queues.testaddress", 10, 20, false, null);

verifyReceiveAllInRange(0, 20, 0);
logger.debug("*****************************************************************************");
}

public boolean isNetty() {
return true;
}
}

0 comments on commit a1f3db9

Please sign in to comment.