diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java index e4e07089..ad264928 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/ConnectionManager.java @@ -87,9 +87,30 @@ public class ConnectionManager implements AutoCloseable { public static final String EMPTY_ROUTING_KEY = ""; private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class); - private final Connection connection; + enum ConnectionType { + PUBLISH("-publish"), + CONSUME("-consume"); + + private final String nameSuffix; + private final HealthMetrics metrics = new HealthMetrics(this); + + ConnectionType(String nameSuffix) { + this.nameSuffix = nameSuffix; + } + + public String getNameSuffix() { + return nameSuffix; + } + + public HealthMetrics getMetrics() { + return metrics; + } + } + + private final Connection publishConnection; + private final Connection consumeConnection; private final Map channelsByPin = new ConcurrentHashMap<>(); - private final AtomicReference connectionIsClosed = new AtomicReference<>(State.OPEN); + private final AtomicReference connectionState = new AtomicReference<>(State.OPEN); private final ConnectionManagerConfiguration configuration; private final String subscriberName; private final AtomicInteger nextSubscriberId = new AtomicInteger(1); @@ -98,22 +119,6 @@ public class ConnectionManager implements AutoCloseable { new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build() ); - private final HealthMetrics metrics = new HealthMetrics(this); - - private final RecoveryListener recoveryListener = new RecoveryListener() { - @Override - public void handleRecovery(Recoverable recoverable) { - metrics.getReadinessMonitor().enable(); - LOGGER.info("Recovery finished. Set RabbitMQ readiness to true"); - metrics.getLivenessMonitor().enable(); - } - - @Override - public void handleRecoveryStarted(Recoverable recoverable) { - LOGGER.warn("Recovery started..."); - } - }; - public ConnectionManagerConfiguration getConfiguration() { return configuration; } @@ -124,7 +129,7 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null"); this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null"); - String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), rabbitMQConfiguration.getSubscriberName()); + String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), connectionManagerConfiguration.getSubscriberName()); if (StringUtils.isBlank(subscriberNameTmp)) { subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis(); LOGGER.info("Subscribers will use default name: {}", subscriberName); @@ -160,6 +165,17 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig factory.setRequestedHeartbeat(connectionManagerConfiguration.getHeartbeatIntervalSeconds()); } + sharedExecutor = Executors.newFixedThreadPool(configuration.getWorkingThreads(), new ThreadFactoryBuilder() + .setNameFormat("rabbitmq-shared-pool-%d") + .build()); + factory.setSharedExecutor(sharedExecutor); + publishConnection = createConnection(factory, connectionName, ConnectionType.PUBLISH); + consumeConnection = createConnection(factory, connectionName, ConnectionType.CONSUME); + } + + private Connection createConnection(ConnectionFactory factory, String connectionName, ConnectionType connectionType) { + HealthMetrics metrics = connectionType.getMetrics(); + factory.setExceptionHandler(new ExceptionHandler() { @Override public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) { @@ -207,13 +223,13 @@ private void turnOffReadiness(Throwable exception) { } }); - factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> connectionIsClosed.get() != State.CLOSED); + factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> connectionState.get() != State.CLOSED); factory.setRecoveryDelayHandler(recoveryAttempts -> { - int minTime = connectionManagerConfiguration.getMinConnectionRecoveryTimeout(); - int maxTime = connectionManagerConfiguration.getMaxConnectionRecoveryTimeout(); - int maxRecoveryAttempts = connectionManagerConfiguration.getMaxRecoveryAttempts(); - int deviationPercent = connectionManagerConfiguration.getRetryTimeDeviationPercent(); + int minTime = configuration.getMinConnectionRecoveryTimeout(); + int maxTime = configuration.getMaxConnectionRecoveryTimeout(); + int maxRecoveryAttempts = configuration.getMaxRecoveryAttempts(); + int deviationPercent = configuration.getRetryTimeDeviationPercent(); LOGGER.debug("Try to recovery connection to RabbitMQ. Count tries = {}", recoveryAttempts); int recoveryDelay = RetryingDelay.getRecoveryDelay(recoveryAttempts, minTime, maxTime, maxRecoveryAttempts, deviationPercent); @@ -226,17 +242,14 @@ private void turnOffReadiness(Throwable exception) { return recoveryDelay; }); - sharedExecutor = Executors.newFixedThreadPool(configuration.getWorkingThreads(), new ThreadFactoryBuilder() - .setNameFormat("rabbitmq-shared-pool-%d") - .build()); - factory.setSharedExecutor(sharedExecutor); + Connection connection; try { - connection = factory.newConnection(connectionName); + connection = factory.newConnection(connectionName + connectionType.getNameSuffix()); LOGGER.info("Created RabbitMQ connection {} [{}]", connection, connection.hashCode()); - addShutdownListenerToConnection(this.connection); - addBlockedListenersToConnection(this.connection); - addRecoveryListenerToConnection(this.connection); + addShutdownListenerToConnection(connection); + addBlockedListenersToConnection(connection); + addRecoveryListenerToConnection(connection, metrics); metrics.getReadinessMonitor().enable(); LOGGER.debug("Set RabbitMQ readiness to true"); } catch (IOException | TimeoutException e) { @@ -244,6 +257,24 @@ private void turnOffReadiness(Throwable exception) { LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e); throw new IllegalStateException("Failed to create RabbitMQ connection using configuration", e); } + + return connection; + } + + private RecoveryListener getRecoveryListener(HealthMetrics metrics) { + return new RecoveryListener() { + @Override + public void handleRecovery(Recoverable recoverable) { + metrics.getReadinessMonitor().enable(); + LOGGER.info("Recovery finished. Set RabbitMQ readiness to true"); + metrics.getLivenessMonitor().enable(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + LOGGER.warn("Recovery started..."); + } + }; } private final static List recoverableErrors = List.of( @@ -298,7 +329,7 @@ private void recoverSubscriptionsOfChannel(@NotNull final PinId pinId, Channel c if (subscriptionCallbacks != null) { - LOGGER.info("Changing channel for holder with pin id: " + pinId); + LOGGER.info("Changing channel for holder with pin id: {}", pinId); var removedHolder = channelsByPin.remove(pinId); if (removedHolder != holder) throw new IllegalStateException("Channel holder has been replaced"); @@ -335,10 +366,10 @@ private void addShutdownListenerToConnection(Connection conn) { }); } - private void addRecoveryListenerToConnection(Connection conn) { + private void addRecoveryListenerToConnection(Connection conn, HealthMetrics metrics) { if (conn instanceof Recoverable) { Recoverable recoverableConnection = (Recoverable) conn; - recoverableConnection.addRecoveryListener(recoveryListener); + recoverableConnection.addRecoveryListener(getRecoveryListener(metrics)); LOGGER.debug("Recovery listener was added to connection."); } else { throw new IllegalStateException("Connection does not implement Recoverable. Can not add RecoveryListener to it"); @@ -359,13 +390,14 @@ public void handleUnblocked() { }); } + // TODO: what should it return if one of the connections is not opened? public boolean isOpen() { - return connection.isOpen() && connectionIsClosed.get() == State.OPEN; + return (publishConnection.isOpen() && consumeConnection.isOpen()) && connectionState.get() == State.OPEN; } @Override public void close() { - if (!connectionIsClosed.compareAndSet(State.OPEN, State.CLOSING)) { + if (!connectionState.compareAndSet(State.OPEN, State.CLOSING)) { LOGGER.info("Connection manager already closed"); return; } @@ -399,28 +431,33 @@ public void close() { } } - connectionIsClosed.set(State.CLOSED); + connectionState.set(State.CLOSED); + closeConnection(publishConnection, closeTimeout); + closeConnection(consumeConnection, closeTimeout); + + shutdownExecutor(sharedExecutor, closeTimeout, "rabbit-shared"); + shutdownExecutor(channelChecker, closeTimeout, "channel-checker"); + } + + private void closeConnection(Connection connection, int timeout) { if (connection.isOpen()) { try { - connection.close(closeTimeout); + connection.close(timeout); } catch (IOException e) { - LOGGER.error("Cannot close connection", e); + LOGGER.error("Failed to close connection", e); } } - - shutdownExecutor(sharedExecutor, closeTimeout, "rabbit-shared"); - shutdownExecutor(channelChecker, closeTimeout, "channel-checker"); } public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException, InterruptedException { - ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey)); + ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey), publishConnection); holder.retryingPublishWithLock(configuration, body, (channel, payload) -> channel.basicPublish(exchange, routingKey, props, payload)); } public String queueDeclare() throws IOException { - ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions()); + ChannelHolder holder = new ChannelHolder(() -> createChannel(consumeConnection), this::waitForConnectionRecovery, configurationToOptions()); return holder.mapWithLock(channel -> { String queue = channel.queueDeclare( "", // queue name @@ -436,7 +473,7 @@ public String queueDeclare() throws IOException { public ExclusiveSubscriberMonitor basicConsume(String queue, ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) throws IOException, InterruptedException { PinId pinId = PinId.forQueue(queue); - ChannelHolder holder = getOrCreateChannelFor(pinId, new SubscriptionCallbacks(deliverCallback, cancelCallback)); + ChannelHolder holder = getOrCreateChannelFor(pinId, consumeConnection, new SubscriptionCallbacks(deliverCallback, cancelCallback)); String tag = holder.retryingConsumeWithLock(channel -> channel.basicConsume(queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(), (tagTmp, delivery) -> { try { @@ -455,7 +492,7 @@ public void reject() throws IOException { LOGGER.warn("Error during basicReject of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e); throw e; } finally { - holder.release(() -> metrics.getReadinessMonitor().enable()); + holder.release(() -> ConnectionType.CONSUME.metrics.getReadinessMonitor().enable()); } }); } @@ -469,7 +506,7 @@ public void confirm() throws IOException { LOGGER.warn("Error during basicAck of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e); throw e; } finally { - holder.release(() -> metrics.getReadinessMonitor().enable()); + holder.release(() -> ConnectionType.CONSUME.metrics.getReadinessMonitor().enable()); } }); } @@ -483,7 +520,7 @@ public void confirm() throws IOException { LOGGER.warn("The confirmation for delivery {} in queue={} routing_key={} was not invoked within the specified delay", deliveryTag, queue, routingKey); if (holder.reachedPendingLimit()) { - metrics.getReadinessMonitor().disable(); + ConnectionType.CONSUME.metrics.getReadinessMonitor().disable(); } }); return false; // to cast to Callable @@ -500,11 +537,13 @@ public void confirm() throws IOException { } boolean isReady() { - return metrics.getReadinessMonitor().isEnabled(); + // TODO: what should it return if one of the connections is not ready? + return ConnectionType.PUBLISH.metrics.getReadinessMonitor().isEnabled() && ConnectionType.CONSUME.metrics.getReadinessMonitor().isEnabled(); } boolean isAlive() { - return metrics.getLivenessMonitor().isEnabled(); + // TODO: what should it return if one of the connections is disabled? + return ConnectionType.PUBLISH.metrics.getLivenessMonitor().isEnabled() && ConnectionType.CONSUME.metrics.getLivenessMonitor().isEnabled(); } private ChannelHolderOptions configurationToOptions() { @@ -520,7 +559,7 @@ private void basicCancel(Channel channel, String consumerTag) throws IOException } public String queueExclusiveDeclareAndBind(String exchange) throws IOException, TimeoutException { - try (Channel channel = createChannel()) { + try (Channel channel = createChannel(consumeConnection)) { String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, exchange, EMPTY_ROUTING_KEY); LOGGER.info("Declared the '{}' queue to listen to the '{}'", queue, exchange); @@ -551,17 +590,17 @@ public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCa } } - private ChannelHolder getOrCreateChannelFor(PinId pinId) { + private ChannelHolder getOrCreateChannelFor(PinId pinId, Connection connection) { return channelsByPin.computeIfAbsent(pinId, ignore -> { LOGGER.trace("Creating channel holder for {}", pinId); - return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions()); + return new ChannelHolder(() -> createChannel(connection), this::waitForConnectionRecovery, configurationToOptions()); }); } - private ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) { + private ChannelHolder getOrCreateChannelFor(PinId pinId, Connection connection, SubscriptionCallbacks subscriptionCallbacks) { return channelsByPin.computeIfAbsent(pinId, ignore -> { LOGGER.trace("Creating channel holder with callbacks for {}", pinId); - return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configurationToOptions(), subscriptionCallbacks); + return new ChannelHolder(() -> createChannelWithOptionalRecovery(connection, true), this::waitForConnectionRecovery, configurationToOptions(), subscriptionCallbacks); }); } @@ -572,11 +611,11 @@ private void putChannelFor(PinId pinId, ChannelHolder holder) { } } - private Channel createChannel() { - return createChannelWithOptionalRecovery(false); + private Channel createChannel(Connection connection) { + return createChannelWithOptionalRecovery(connection, false); } - private Channel createChannelWithOptionalRecovery(Boolean withRecovery) { + private Channel createChannelWithOptionalRecovery(Connection connection, Boolean withRecovery) { waitForConnectionRecovery(connection); try { @@ -606,7 +645,7 @@ private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitFo } } - if (connectionIsClosed.get() == State.CLOSED) { + if (connectionState.get() == State.CLOSED) { throw new IllegalStateException("Connection is already closed"); } } @@ -626,7 +665,7 @@ private void waitForRecovery(ShutdownNotifier notifier) { private boolean isConnectionRecovery(ShutdownNotifier notifier) { return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen() - && connectionIsClosed.get() != State.CLOSED; + && connectionState.get() != State.CLOSED; } /** @@ -763,7 +802,7 @@ private static class ChannelHolder { @GuardedBy("subscribingLock") private boolean isSubscribed; @GuardedBy("lock") - private Deque redeliveryQueue = new ArrayDeque<>(); + private final Deque redeliveryQueue = new ArrayDeque<>(); private final PublisherConfirmationListener publishConfirmationListener; @@ -1068,9 +1107,7 @@ private void setupPublisherConfirmation(Channel channel) throws IOException { return; } channel.confirmSelect(); - channel.addShutdownListener(cause -> { - publishConfirmationListener.noConfirmationWillBeReceived(); - }); + channel.addShutdownListener(cause -> publishConfirmationListener.noConfirmationWillBeReceived()); channel.addConfirmListener(publishConfirmationListener); }