Skip to content

Commit

Permalink
Merge pull request #327 from NOAA-OWP/issue326
Browse files Browse the repository at this point in the history
Create basic happy and sad path tests for broker connections, #326.
  • Loading branch information
james-d-brown authored Oct 1, 2024
2 parents b216650 + 24e42e7 commit 960cd03
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 60 deletions.
124 changes: 79 additions & 45 deletions wres-events/src/wres/events/broker/BrokerConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class BrokerConnectionFactory implements Supplier<Connection>
/** The maximum number of times a message can be resent on failure. */
private final Integer maximumMessageRetries;

/** Context that maps JMS objects to names. */
private final Context context;

/** A connection factory. */
private final ConnectionFactory connectionFactory;

/** The connection properties. */
private final Properties properties;

/**
* <p>Returns an instance of a factory, which is created with supplied properties and a default number of message
* retries, {@link #DEFAULT_MAXIMUM_MESSAGE_RETRIES}.
Expand Down Expand Up @@ -120,7 +120,18 @@ public Destination getDestination( String name ) throws NamingException
{
Objects.requireNonNull( name );

return ( Destination ) this.context.lookup( name );
String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( this.properties );
BrokerUtilities.testConnectionProperty( connectionPropertyName, this.properties );
String connectionUrl = this.properties.getProperty( connectionPropertyName );
Context context = BrokerConnectionFactory.getContextFromProperties( connectionUrl, this.properties );
try
{
return ( Destination ) context.lookup( name );
}
finally
{
BrokerConnectionFactory.closeContext( context );
}
}

/**
Expand Down Expand Up @@ -212,26 +223,55 @@ static void testConnection( Properties properties, int retries )
}
finally
{
// Close any connection that succeeded
if ( Objects.nonNull( connection ) )
{
LOGGER.info( "Established a connection to an AMQP message broker at binding URL: {}.",
connectionUrl );

try
{
connection.close();
}
catch ( JMSException e )
{
LOGGER.error( "Failed to close an attempted connection during the instantiation of a "
+ "connection factory." );
}
}
BrokerConnectionFactory.closeConnection( connection, connectionUrl );
BrokerConnectionFactory.closeContext( localContext );
}
}
}

/**
* Closes a connection.
*
* @param connection the connection, possibly null
* @param connectionUrl the connection URL for logging
*/
private static void closeConnection( Connection connection, String connectionUrl )
{
// Close any connection that succeeded
if ( Objects.nonNull( connection ) )
{
LOGGER.info( "Established a connection to an AMQP message broker at binding URL: {}.",
connectionUrl );

try
{
connection.close();
}
catch ( JMSException e )
{
LOGGER.error( "Failed to close an attempted connection during the instantiation of a "
+ "connection factory. The correction URL is: {}.", connectionUrl );
}
}
}

/**
* Closes the supplied context.
* @param context the context
*/

private static void closeContext( Context context )
{
try
{
context.close();
}
catch ( NamingException e )
{
LOGGER.error( "Failed to close the context for a connection." );
}
}

/**
* Returns the context from the properties.
* @param connectionUrl the connection string to help with exception messaging
Expand Down Expand Up @@ -313,41 +353,35 @@ private BrokerConnectionFactory( Properties properties,
}

this.maximumMessageRetries = maximumMessageRetries;
this.properties = properties;

// The connection property name
String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( properties );

// Set any variables that depend on the (possibly adjusted) properties
try
{
LOGGER.debug( "Creating a connection factory with these properties: {}.", properties );

// Test
String connectionString = properties.getProperty( connectionPropertyName );
this.context = new InitialContext( properties );
this.connectionFactory = BrokerConnectionFactory.getConnectionFactory( connectionString,
this.context,
connectionPropertyName );
LOGGER.debug( "Creating a connection factory with these properties: {}.", properties );

LOGGER.debug( "Testing the connection property {} with corresponding connection string {}.",
connectionPropertyName,
connectionString );
// Test
String connectionString = properties.getProperty( connectionPropertyName );
Context context = BrokerConnectionFactory.getContextFromProperties( connectionString, properties );
this.connectionFactory = BrokerConnectionFactory.getConnectionFactory( connectionString,
context,
connectionPropertyName );

BrokerConnectionFactory.testConnection( properties,
BrokerConnectionFactory.MAXIMUM_CONNECTION_RETRIES );
LOGGER.debug( "Testing the connection property {} with corresponding connection string {}.",
connectionPropertyName,
connectionString );

if ( LOGGER.isInfoEnabled() )
{
LOGGER.info( "Created a broker connection factory {} with name {} and binding URL {}.",
this,
connectionPropertyName,
properties.getProperty( connectionPropertyName ) );
}
}
catch ( NamingException e )
BrokerConnectionFactory.testConnection( properties,
BrokerConnectionFactory.MAXIMUM_CONNECTION_RETRIES );

if ( LOGGER.isInfoEnabled() )
{
throw new CouldNotLoadBrokerConfigurationException( "Unable to load the expected broker configuration.",
e );
LOGGER.info( "Created a broker connection factory {} with name {} and binding URL {}.",
this,
connectionPropertyName,
properties.getProperty( connectionPropertyName ) );
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package wres.events.broker;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -38,17 +39,17 @@ class BrokerConnectionFactoryTest
void testMessageRouting() throws IOException, NamingException, JMSException, InterruptedException
{
Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" );

// Create and start the broker, clean up on completion
try ( EmbeddedBroker ignored = EmbeddedBroker.of( properties, true ) )
try ( EmbeddedBroker ignored = EmbeddedBroker.of( properties, true ) )
{
BrokerConnectionFactory factory = BrokerConnectionFactory.of( properties, 2 );
Topic evaluationTopic = (Topic) factory.getDestination( "evaluation" );
Topic evaluationStatusTopic = (Topic) factory.getDestination( "status" );
Topic statisticsTopic = (Topic) factory.getDestination( "statistics" );
Topic evaluationTopic = ( Topic ) factory.getDestination( "evaluation" );
Topic evaluationStatusTopic = ( Topic ) factory.getDestination( "status" );
Topic statisticsTopic = ( Topic ) factory.getDestination( "statistics" );

String evaluationId = "1234567";

// Application/JMS-level message selection based on correlation id
String messageSelector = "JMSCorrelationID = '" + evaluationId + "'";

Expand All @@ -69,8 +70,8 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int

// Listen for evaluation messages
MessageListener evaluationListener = message -> {
TextMessage textMessage = (TextMessage) message;
TextMessage textMessage = ( TextMessage ) message;

try
{
assertEquals( "I am an evaluation message!", textMessage.getText() );
Expand All @@ -79,15 +80,15 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int
{
throw new IllegalStateException( e );
}

LOGGER.info( "Received an evaluation message {}", textMessage );
evaluationConsumerCount.countDown();
};

// Listen for evaluation status messages
MessageListener evaluationStatusListener = message -> {
TextMessage textMessage = (TextMessage) message;
TextMessage textMessage = ( TextMessage ) message;

try
{
assertEquals( "I am an evaluation status message!", textMessage.getText() );
Expand All @@ -96,15 +97,15 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int
{
throw new IllegalStateException( e );
}

LOGGER.info( "Received an evaluation status message {}", textMessage );
evaluationStatusConsumerCount.countDown();
};

// Listen for statistics messages
MessageListener evaluationStatisticsListener = message -> {
TextMessage textMessage = (TextMessage) message;
TextMessage textMessage = ( TextMessage ) message;

try
{
assertEquals( "I am a statistics message!", textMessage.getText() );
Expand All @@ -113,7 +114,7 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int
{
throw new IllegalStateException( e );
}

LOGGER.info( "Received a statistics message {}", textMessage );
statisticsConsumerCount.countDown();
};
Expand Down Expand Up @@ -148,5 +149,22 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int
}
}

@Test
void testConnectionSucceedsWhenPropertiesAreCorrect()
{
Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" );
assertThrows( BrokerConnectionException.class, () -> BrokerConnectionFactory.testConnection( properties, 0 ) );
}

@Test
void testConnectionFailsWhenPropertiesAreIncorrect()
{
Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" );

String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( properties );
// Replace resolved property with url/port that is guaranteed not to have a broker running
properties.put( connectionPropertyName, "amqp://localhost:-1" );
assertThrows( BrokerConnectionException.class, () -> BrokerConnectionFactory.testConnection( properties, 0 ) );
}

}

0 comments on commit 960cd03

Please sign in to comment.