Skip to content

Commit

Permalink
separate connections for publishing and subscribing
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Smelov committed Jul 3, 2024
1 parent 27cc502 commit 804d1ff
Showing 1 changed file with 102 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,30 @@ public class ConnectionManager implements AutoCloseable {
public static final String EMPTY_ROUTING_KEY = "";
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);

private final Connection connection;
enum ConnectionType {
PUBLISH("-publish"),
CONSUME("-consume");

private final String nameSuffix;
private final HealthMetrics metrics = new HealthMetrics(this);

ConnectionType(String nameSuffix) {
this.nameSuffix = nameSuffix;
}

public String getNameSuffix() {
return nameSuffix;
}

public HealthMetrics getMetrics() {
return metrics;
}
}

private final Connection publishConnection;
private final Connection consumeConnection;
private final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>();
private final AtomicReference<State> connectionIsClosed = new AtomicReference<>(State.OPEN);
private final AtomicReference<State> connectionState = new AtomicReference<>(State.OPEN);
private final ConnectionManagerConfiguration configuration;
private final String subscriberName;
private final AtomicInteger nextSubscriberId = new AtomicInteger(1);
Expand All @@ -98,22 +119,6 @@ public class ConnectionManager implements AutoCloseable {
new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build()
);

private final HealthMetrics metrics = new HealthMetrics(this);

private final RecoveryListener recoveryListener = new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
metrics.getReadinessMonitor().enable();
LOGGER.info("Recovery finished. Set RabbitMQ readiness to true");
metrics.getLivenessMonitor().enable();
}

@Override
public void handleRecoveryStarted(Recoverable recoverable) {
LOGGER.warn("Recovery started...");
}
};

public ConnectionManagerConfiguration getConfiguration() {
return configuration;
}
Expand All @@ -124,7 +129,7 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig
Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null");
this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null");

String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), rabbitMQConfiguration.getSubscriberName());
String subscriberNameTmp = ObjectUtils.defaultIfNull(connectionManagerConfiguration.getSubscriberName(), connectionManagerConfiguration.getSubscriberName());
if (StringUtils.isBlank(subscriberNameTmp)) {
subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis();
LOGGER.info("Subscribers will use default name: {}", subscriberName);
Expand Down Expand Up @@ -160,6 +165,17 @@ public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfig
factory.setRequestedHeartbeat(connectionManagerConfiguration.getHeartbeatIntervalSeconds());
}

sharedExecutor = Executors.newFixedThreadPool(configuration.getWorkingThreads(), new ThreadFactoryBuilder()
.setNameFormat("rabbitmq-shared-pool-%d")
.build());
factory.setSharedExecutor(sharedExecutor);
publishConnection = createConnection(factory, connectionName, ConnectionType.PUBLISH);
consumeConnection = createConnection(factory, connectionName, ConnectionType.CONSUME);
}

private Connection createConnection(ConnectionFactory factory, String connectionName, ConnectionType connectionType) {
HealthMetrics metrics = connectionType.getMetrics();

factory.setExceptionHandler(new ExceptionHandler() {
@Override
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
Expand Down Expand Up @@ -207,13 +223,13 @@ private void turnOffReadiness(Throwable exception) {
}
});

factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> connectionIsClosed.get() != State.CLOSED);
factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> connectionState.get() != State.CLOSED);

factory.setRecoveryDelayHandler(recoveryAttempts -> {
int minTime = connectionManagerConfiguration.getMinConnectionRecoveryTimeout();
int maxTime = connectionManagerConfiguration.getMaxConnectionRecoveryTimeout();
int maxRecoveryAttempts = connectionManagerConfiguration.getMaxRecoveryAttempts();
int deviationPercent = connectionManagerConfiguration.getRetryTimeDeviationPercent();
int minTime = configuration.getMinConnectionRecoveryTimeout();
int maxTime = configuration.getMaxConnectionRecoveryTimeout();
int maxRecoveryAttempts = configuration.getMaxRecoveryAttempts();
int deviationPercent = configuration.getRetryTimeDeviationPercent();

LOGGER.debug("Try to recovery connection to RabbitMQ. Count tries = {}", recoveryAttempts);
int recoveryDelay = RetryingDelay.getRecoveryDelay(recoveryAttempts, minTime, maxTime, maxRecoveryAttempts, deviationPercent);
Expand All @@ -226,24 +242,39 @@ private void turnOffReadiness(Throwable exception) {
return recoveryDelay;
});

sharedExecutor = Executors.newFixedThreadPool(configuration.getWorkingThreads(), new ThreadFactoryBuilder()
.setNameFormat("rabbitmq-shared-pool-%d")
.build());
factory.setSharedExecutor(sharedExecutor);
Connection connection;

try {
connection = factory.newConnection(connectionName);
connection = factory.newConnection(connectionName + connectionType.getNameSuffix());
LOGGER.info("Created RabbitMQ connection {} [{}]", connection, connection.hashCode());
addShutdownListenerToConnection(this.connection);
addBlockedListenersToConnection(this.connection);
addRecoveryListenerToConnection(this.connection);
addShutdownListenerToConnection(connection);
addBlockedListenersToConnection(connection);
addRecoveryListenerToConnection(connection, metrics);
metrics.getReadinessMonitor().enable();
LOGGER.debug("Set RabbitMQ readiness to true");
} catch (IOException | TimeoutException e) {
metrics.getReadinessMonitor().disable();
LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", e);
throw new IllegalStateException("Failed to create RabbitMQ connection using configuration", e);
}

return connection;
}

private RecoveryListener getRecoveryListener(HealthMetrics metrics) {
return new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
metrics.getReadinessMonitor().enable();
LOGGER.info("Recovery finished. Set RabbitMQ readiness to true");
metrics.getLivenessMonitor().enable();
}

@Override
public void handleRecoveryStarted(Recoverable recoverable) {
LOGGER.warn("Recovery started...");
}
};
}

private final static List<String> recoverableErrors = List.of(
Expand Down Expand Up @@ -298,7 +329,7 @@ private void recoverSubscriptionsOfChannel(@NotNull final PinId pinId, Channel c

if (subscriptionCallbacks != null) {

LOGGER.info("Changing channel for holder with pin id: " + pinId);
LOGGER.info("Changing channel for holder with pin id: {}", pinId);

var removedHolder = channelsByPin.remove(pinId);
if (removedHolder != holder) throw new IllegalStateException("Channel holder has been replaced");
Expand Down Expand Up @@ -335,10 +366,10 @@ private void addShutdownListenerToConnection(Connection conn) {
});
}

private void addRecoveryListenerToConnection(Connection conn) {
private void addRecoveryListenerToConnection(Connection conn, HealthMetrics metrics) {
if (conn instanceof Recoverable) {
Recoverable recoverableConnection = (Recoverable) conn;
recoverableConnection.addRecoveryListener(recoveryListener);
recoverableConnection.addRecoveryListener(getRecoveryListener(metrics));
LOGGER.debug("Recovery listener was added to connection.");
} else {
throw new IllegalStateException("Connection does not implement Recoverable. Can not add RecoveryListener to it");
Expand All @@ -359,13 +390,14 @@ public void handleUnblocked() {
});
}

// TODO: what should it return if one of the connections is not opened?
public boolean isOpen() {
return connection.isOpen() && connectionIsClosed.get() == State.OPEN;
return (publishConnection.isOpen() && consumeConnection.isOpen()) && connectionState.get() == State.OPEN;
}

@Override
public void close() {
if (!connectionIsClosed.compareAndSet(State.OPEN, State.CLOSING)) {
if (!connectionState.compareAndSet(State.OPEN, State.CLOSING)) {
LOGGER.info("Connection manager already closed");
return;
}
Expand Down Expand Up @@ -399,28 +431,33 @@ public void close() {
}
}

connectionIsClosed.set(State.CLOSED);
connectionState.set(State.CLOSED);

closeConnection(publishConnection, closeTimeout);
closeConnection(consumeConnection, closeTimeout);

shutdownExecutor(sharedExecutor, closeTimeout, "rabbit-shared");
shutdownExecutor(channelChecker, closeTimeout, "channel-checker");
}

private void closeConnection(Connection connection, int timeout) {
if (connection.isOpen()) {
try {
connection.close(closeTimeout);
connection.close(timeout);
} catch (IOException e) {
LOGGER.error("Cannot close connection", e);
LOGGER.error("Failed to close connection", e);
}
}

shutdownExecutor(sharedExecutor, closeTimeout, "rabbit-shared");
shutdownExecutor(channelChecker, closeTimeout, "channel-checker");
}

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException, InterruptedException {
ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey));
ChannelHolder holder = getOrCreateChannelFor(PinId.forRoutingKey(exchange, routingKey), publishConnection);
holder.retryingPublishWithLock(configuration, body,
(channel, payload) -> channel.basicPublish(exchange, routingKey, props, payload));
}

public String queueDeclare() throws IOException {
ChannelHolder holder = new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions());
ChannelHolder holder = new ChannelHolder(() -> createChannel(consumeConnection), this::waitForConnectionRecovery, configurationToOptions());
return holder.mapWithLock(channel -> {
String queue = channel.queueDeclare(
"", // queue name
Expand All @@ -436,7 +473,7 @@ public String queueDeclare() throws IOException {

public ExclusiveSubscriberMonitor basicConsume(String queue, ManualAckDeliveryCallback deliverCallback, CancelCallback cancelCallback) throws IOException, InterruptedException {
PinId pinId = PinId.forQueue(queue);
ChannelHolder holder = getOrCreateChannelFor(pinId, new SubscriptionCallbacks(deliverCallback, cancelCallback));
ChannelHolder holder = getOrCreateChannelFor(pinId, consumeConnection, new SubscriptionCallbacks(deliverCallback, cancelCallback));
String tag = holder.retryingConsumeWithLock(channel ->
channel.basicConsume(queue, false, subscriberName + "_" + nextSubscriberId.getAndIncrement(), (tagTmp, delivery) -> {
try {
Expand All @@ -455,7 +492,7 @@ public void reject() throws IOException {
LOGGER.warn("Error during basicReject of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e);
throw e;
} finally {
holder.release(() -> metrics.getReadinessMonitor().enable());
holder.release(() -> ConnectionType.CONSUME.metrics.getReadinessMonitor().enable());
}
});
}
Expand All @@ -469,7 +506,7 @@ public void confirm() throws IOException {
LOGGER.warn("Error during basicAck of message with deliveryTag = {} inside channel #{}: {}", deliveryTag, ch.getChannelNumber(), e);
throw e;
} finally {
holder.release(() -> metrics.getReadinessMonitor().enable());
holder.release(() -> ConnectionType.CONSUME.metrics.getReadinessMonitor().enable());
}
});
}
Expand All @@ -483,7 +520,7 @@ public void confirm() throws IOException {
LOGGER.warn("The confirmation for delivery {} in queue={} routing_key={} was not invoked within the specified delay",
deliveryTag, queue, routingKey);
if (holder.reachedPendingLimit()) {
metrics.getReadinessMonitor().disable();
ConnectionType.CONSUME.metrics.getReadinessMonitor().disable();
}
});
return false; // to cast to Callable
Expand All @@ -500,11 +537,13 @@ public void confirm() throws IOException {
}

boolean isReady() {
return metrics.getReadinessMonitor().isEnabled();
// TODO: what should it return if one of the connections is not ready?
return ConnectionType.PUBLISH.metrics.getReadinessMonitor().isEnabled() && ConnectionType.CONSUME.metrics.getReadinessMonitor().isEnabled();
}

boolean isAlive() {
return metrics.getLivenessMonitor().isEnabled();
// TODO: what should it return if one of the connections is disabled?
return ConnectionType.PUBLISH.metrics.getLivenessMonitor().isEnabled() && ConnectionType.CONSUME.metrics.getLivenessMonitor().isEnabled();
}

private ChannelHolderOptions configurationToOptions() {
Expand All @@ -520,7 +559,7 @@ private void basicCancel(Channel channel, String consumerTag) throws IOException
}

public String queueExclusiveDeclareAndBind(String exchange) throws IOException, TimeoutException {
try (Channel channel = createChannel()) {
try (Channel channel = createChannel(consumeConnection)) {
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, exchange, EMPTY_ROUTING_KEY);
LOGGER.info("Declared the '{}' queue to listen to the '{}'", queue, exchange);
Expand Down Expand Up @@ -551,17 +590,17 @@ public SubscriptionCallbacks(ManualAckDeliveryCallback deliverCallback, CancelCa
}
}

private ChannelHolder getOrCreateChannelFor(PinId pinId) {
private ChannelHolder getOrCreateChannelFor(PinId pinId, Connection connection) {
return channelsByPin.computeIfAbsent(pinId, ignore -> {
LOGGER.trace("Creating channel holder for {}", pinId);
return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery, configurationToOptions());
return new ChannelHolder(() -> createChannel(connection), this::waitForConnectionRecovery, configurationToOptions());
});
}

private ChannelHolder getOrCreateChannelFor(PinId pinId, SubscriptionCallbacks subscriptionCallbacks) {
private ChannelHolder getOrCreateChannelFor(PinId pinId, Connection connection, SubscriptionCallbacks subscriptionCallbacks) {
return channelsByPin.computeIfAbsent(pinId, ignore -> {
LOGGER.trace("Creating channel holder with callbacks for {}", pinId);
return new ChannelHolder(() -> createChannelWithOptionalRecovery(true), this::waitForConnectionRecovery, configurationToOptions(), subscriptionCallbacks);
return new ChannelHolder(() -> createChannelWithOptionalRecovery(connection, true), this::waitForConnectionRecovery, configurationToOptions(), subscriptionCallbacks);
});
}

Expand All @@ -572,11 +611,11 @@ private void putChannelFor(PinId pinId, ChannelHolder holder) {
}
}

private Channel createChannel() {
return createChannelWithOptionalRecovery(false);
private Channel createChannel(Connection connection) {
return createChannelWithOptionalRecovery(connection, false);
}

private Channel createChannelWithOptionalRecovery(Boolean withRecovery) {
private Channel createChannelWithOptionalRecovery(Connection connection, Boolean withRecovery) {
waitForConnectionRecovery(connection);

try {
Expand Down Expand Up @@ -606,7 +645,7 @@ private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitFo
}
}

if (connectionIsClosed.get() == State.CLOSED) {
if (connectionState.get() == State.CLOSED) {
throw new IllegalStateException("Connection is already closed");
}
}
Expand All @@ -626,7 +665,7 @@ private void waitForRecovery(ShutdownNotifier notifier) {

private boolean isConnectionRecovery(ShutdownNotifier notifier) {
return !(notifier instanceof AutorecoveringChannel) && !notifier.isOpen()
&& connectionIsClosed.get() != State.CLOSED;
&& connectionState.get() != State.CLOSED;
}

/**
Expand Down Expand Up @@ -763,7 +802,7 @@ private static class ChannelHolder {
@GuardedBy("subscribingLock")
private boolean isSubscribed;
@GuardedBy("lock")
private Deque<PublicationHolder> redeliveryQueue = new ArrayDeque<>();
private final Deque<PublicationHolder> redeliveryQueue = new ArrayDeque<>();

private final PublisherConfirmationListener publishConfirmationListener;

Expand Down Expand Up @@ -1068,9 +1107,7 @@ private void setupPublisherConfirmation(Channel channel) throws IOException {
return;
}
channel.confirmSelect();
channel.addShutdownListener(cause -> {
publishConfirmationListener.noConfirmationWillBeReceived();
});
channel.addShutdownListener(cause -> publishConfirmationListener.noConfirmationWillBeReceived());
channel.addConfirmListener(publishConfirmationListener);
}

Expand Down

0 comments on commit 804d1ff

Please sign in to comment.