Skip to content

Commit

Permalink
fix event bus instance leak
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <[email protected]>
  • Loading branch information
riccardomodanese authored and Coduz committed Apr 7, 2020
1 parent 0c1244b commit dcb65fa
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.kapua.commons.event.ServiceEventBusManager;
import org.eclipse.kapua.commons.event.ServiceEventMarshaler;
import org.eclipse.kapua.commons.event.ServiceEventScope;
import org.eclipse.kapua.commons.metric.MetricServiceFactory;
import org.eclipse.kapua.commons.security.KapuaSecurityUtils;
import org.eclipse.kapua.commons.security.KapuaSession;
import org.eclipse.kapua.commons.setting.system.SystemSetting;
Expand All @@ -34,6 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
Expand All @@ -59,7 +62,7 @@
*
* @since 1.0
*/
public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDriver, ExceptionListener {
public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDriver {

private static final Logger LOGGER = LoggerFactory.getLogger(JMSServiceEventBus.class);

Expand All @@ -75,14 +78,19 @@ public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDrive
private EventBusJMSConnectionBridge eventBusJMSConnectionBridge;
private ServiceEventMarshaler eventBusMarshaler;

private Counter reconnectionRetryCount;
private Counter connectionErrorCount;

/**
* Default constructor
*
* @throws JMSException
* @throws NamingException
*/
public JMSServiceEventBus() throws JMSException, NamingException {
eventBusJMSConnectionBridge = new EventBusJMSConnectionBridge(this);
reconnectionRetryCount = MetricServiceFactory.getInstance().getCounter("event_bus", "handler", "reconnection_retry", "count");
connectionErrorCount = MetricServiceFactory.getInstance().getCounter("event_bus", "handler", "connection_error", "count");
eventBusJMSConnectionBridge = new EventBusJMSConnectionBridge();
}

@Override
Expand Down Expand Up @@ -148,25 +156,6 @@ public ServiceEventBus getEventBus() {
return this;
}

@Override
public void onException(JMSException e) {
LOGGER.error("Connection to the event bus thrown exception: {}", e.getMessage(), e);
int i = 1;
while (true) {
LOGGER.info("EventBus restarting attempt... {}", i);
try {
restart();
LOGGER.info("EventBus restarting attempt... {} DONE", i);
LOGGER.info("EventBus connection RESTORED");
break;
} catch (ServiceEventBusException | JMSException e1) {
LOGGER.error("Cannot start new event bus connection... try again...", e1);
waitBeforeRetry();
}
i++;
}
}

private void waitBeforeRetry() {
// wait a bit
try {
Expand All @@ -176,31 +165,45 @@ private void waitBeforeRetry() {
}
}

private void restart() throws ServiceEventBusException, JMSException {
private synchronized void restart() throws ServiceEventBusException, JMSException {
// restart the event bus connection bridge with a new instance
// so no synchronization is needed
EventBusJMSConnectionBridge instanceToCleanUp = null;
EventBusJMSConnectionBridge newInstance = null;
try {
EventBusJMSConnectionBridge newInstance = new EventBusJMSConnectionBridge(this);
newInstance = new EventBusJMSConnectionBridge();
newInstance.start();
// restore subscriptions
for (Subscription subscription : subscriptionList) {
newInstance.subscribe(subscription);
}
instanceToCleanUp = eventBusJMSConnectionBridge;
eventBusJMSConnectionBridge = newInstance;
} catch (Throwable t) {
throw new ServiceEventBusException(t);
} catch (Exception e) {
LOGGER.warn("Error while creating new Service Event Bus instance: {}", e.getMessage());
//try to cleanup the messy instance
if (newInstance!=null) {
try {
LOGGER.warn("Stopping new Service Event Bus instance...");
newInstance.stop();
LOGGER.warn("Stopping new Service Event Bus instance... DONE");
}
catch(Exception e1) {
//don't throw this exception since the real exception is the first one
LOGGER.warn("Stopping new Service Event Bus instance error: {}", e1.getMessage(), e1);
}
}
throw new ServiceEventBusException(e);
} finally {
try {
if (instanceToCleanUp != null) {
LOGGER.info("Cleanup old JMSConnectionBridge instance...");
LOGGER.info("Stopping old Service Event Bus instance...");
instanceToCleanUp.stop();
} else {
LOGGER.warn("Null old JMSConnectionBridge instance. no cleanup will be done...");
LOGGER.warn("Stopping old Service Event Bus instance. Null instance found so nothig to do...");
}
} catch (ServiceEventBusException e) {
LOGGER.error("Cannot destroy old event bus connection: {}", e.getMessage(), e);
LOGGER.error("Stopping old Service Event Bus instance. Cannot destroy instance: {}", e.getMessage(), e);
} finally {
instanceToCleanUp = null;
}
Expand All @@ -211,10 +214,10 @@ private class EventBusJMSConnectionBridge {

private Connection jmsConnection;
private Map<String, SenderPool> senders = new HashMap<>();
private ExceptionListener exceptionListener;
private ExceptionListenerImpl exceptionListener;

public EventBusJMSConnectionBridge(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
public EventBusJMSConnectionBridge() {
this.exceptionListener = new ExceptionListenerImpl();
}

void start() throws JMSException, NamingException, ServiceEventBusException {
Expand All @@ -239,6 +242,8 @@ void start() throws JMSException, NamingException, ServiceEventBusException {
void stop() throws ServiceEventBusException {
try {
if (jmsConnection != null) {
exceptionListener.stop();
jmsConnection.setExceptionListener(null);
jmsConnection.close();
}
} catch (JMSException e) {
Expand Down Expand Up @@ -424,6 +429,34 @@ public SenderPool(PooledSenderFactory factory) {

}

private class ExceptionListenerImpl implements ExceptionListener {

private boolean active = true;

@Override
public void onException(JMSException e) {
LOGGER.error("EventBus Listener {} - Connection thrown exception: {}", this, e.getMessage(), e);
connectionErrorCount.inc();
int i = 1;
while (active) {
LOGGER.info("EventBus Listener {} - restarting attempt... {}", this, i);
try {
reconnectionRetryCount.inc();
restart();
LOGGER.info("EventBus Listener {} - EventBus restarting attempt... {} DONE (Connection restored)", this, i);
break;
} catch (ServiceEventBusException | JMSException e1) {
LOGGER.error("EventBus Listener {} - Cannot start new event bus connection... try again...", this, e1);
waitBeforeRetry();
}
i++;
}
}

public void stop() {
active = false;
}
}
}

private class Subscription {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
//import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
import org.eclipse.kapua.qa.common.DBHelper;
import org.eclipse.kapua.qa.common.Suppressed;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -65,7 +67,10 @@ public void start() {
if (NO_EMBEDDED_SERVERS) {
return;
}
System.setProperty(SystemSettingKey.EVENT_BUS_URL.key(), "amqp://127.0.0.1:5672");
//set a default value if not set
if (StringUtils.isEmpty(System.getProperty(SystemSettingKey.EVENT_BUS_URL.key()))) {
System.setProperty(SystemSettingKey.EVENT_BUS_URL.key(), "amqp://127.0.0.1:5672");
}
database.setup();

logger.info("Starting new instance of Event Broker");
Expand Down

0 comments on commit dcb65fa

Please sign in to comment.