diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index de6cf0f0e1..e19021b96e 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -190,13 +190,18 @@ void setBuffer(ByteBuf buffer) { return drainCommands(stack); } - private Deque> drainStack() { + private Deque> drainStackUponChannelInactive() { final Deque> target = new ArrayDeque<>(stack.size()); RedisCommand cmd; while ((cmd = stack.poll()) != null) { - if (!cmd.isDone() && !ActivationCommand.isActivationCommand(cmd)) { - target.add(cmd); + if (!cmd.isDone()) { + if (!ActivationCommand.isActivationCommand(cmd)) { + target.add(cmd); + } else { + cmd.completeExceptionally( + new RedisConnectionException("activation command won't be retried upon channel inactive")); + } } } @@ -379,7 +384,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { endpoint.notifyChannelInactive(ctx.channel()); Deque> autoBatchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque(); if (supportsAutoBatchFlush) { - autoBatchFlushRetryableDrainQueuedCommands = drainStack(); + autoBatchFlushRetryableDrainQueuedCommands = drainStackUponChannelInactive(); } else { endpoint.notifyDrainQueuedCommands(this); } diff --git a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java index 2bb6d1d8ac..f433b21329 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java @@ -133,6 +133,8 @@ protected static void cancelCommandOnEndpointClose(RedisCommand cmd) { protected volatile @Nonnull ContextualChannel channel = DummyContextualChannelInstances.CHANNEL_CONNECTING; + private volatile Throwable failedToReconnectReason; + private final Consumer> callbackOnClose; private final boolean rejectCommandsWhileDisconnected; @@ -153,19 +155,17 @@ protected static void cancelCommandOnEndpointClose(RedisCommand cmd) { private ConnectionFacade connectionFacade; - private volatile Throwable connectionError; - private final String cachedEndpointId; protected final UnboundedOfferFirstQueue taskQueue; private final boolean canFire; - private volatile boolean inProtectMode; + private volatile EventLoop lastEventLoop = null; - private volatile Throwable failedToReconnectReason; + private volatile Throwable connectionError; - private volatile EventLoop lastEventLoop = null; + private volatile boolean inProtectMode; private final int writeSpinCount; @@ -316,17 +316,16 @@ private void writeAndFlushActivationCommands(ContextualChannel chan, @Override public void notifyChannelActive(Channel channel) { final ContextualChannel contextualChannel = new ContextualChannel(channel, ConnectionContext.State.CONNECTED); - - this.logPrefix = null; - this.connectionError = null; - if (!CHANNEL.compareAndSet(this, DummyContextualChannelInstances.CHANNEL_CONNECTING, contextualChannel)) { channel.close(); onUnexpectedState("notifyChannelActive", ConnectionContext.State.CONNECTING); return; } - lastEventLoop = channel.eventLoop(); + this.lastEventLoop = channel.eventLoop(); + this.connectionError = null; + this.inProtectMode = false; + this.logPrefix = null; // Created a synchronize-before with set channel to CHANNEL_CONNECTING, if (isClosed()) { @@ -398,7 +397,7 @@ public void notifyChannelInactive(Channel channel) { @Override public void notifyChannelInactiveAfterWatchdogDecision(Channel channel, - Deque> retryableQueuedCommands) { + Deque> retryablePendingCommands) { final ContextualChannel inactiveChan = this.channel; if (!inactiveChan.context.initialState.isConnected()) { logger.error("[unexpected][{}] notifyChannelInactive: channel initial state not connected", logPrefix()); @@ -446,7 +445,7 @@ public void notifyChannelInactiveAfterWatchdogDecision(Channel channel, CHANNEL.set(this, DummyContextualChannelInstances.CHANNEL_ENDPOINT_CLOSED); } inactiveChan.context - .setCloseStatus(new ConnectionContext.CloseStatus(willReconnect, retryableQueuedCommands, exception)); + .setCloseStatus(new ConnectionContext.CloseStatus(willReconnect, retryablePendingCommands, exception)); trySetEndpointQuiescence(inactiveChan); } @@ -945,11 +944,11 @@ private RedisCommand processActivationCommand(RedisCommand