Skip to content

Commit

Permalink
Closes #95: Race condition during shutdown observed
Browse files Browse the repository at this point in the history
At least I assume it closes this race condition. The shutdown process
sets the runflag to false, and then, if we're in consumer.receive(),
close the JMS Session - "from the outside" of the actual processing
thread.

"On the inside", we check whether the received message is null. However,
we can just be on our way out of consumer.receive() with a brand new
message, and *then* the runflag is set to false and JMS Session is
closed. Thus, the previous logic would not catch that, and start
processing the message, but fail when trying to commit the now-closed
JMS Session.

The new logic instead directly checks the runFlag, and bails if it is
false even though we have a message. We close the session, ensuring that
the message reception is rolled back, and exit. Lots of comments in
code, so feel free to read up!
  • Loading branch information
stolsvik committed Jan 7, 2025
1 parent 9c36df8 commit 354f43d
Showing 1 changed file with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,35 +355,78 @@ private void runner() {
try {
if (log.isDebugEnabled()) log.debug(LOG_PREFIX
+ "Going into JMS consumer.receive() for [" + destination + "].");
// Stating that we're waiting in receive, so that the shutdown procedure then can close the JMS
// Session, which will make this receive() return null.
_processorInReceive = true;
message = jmsConsumer.receive();
}
finally {
// Stating that we're no longer in receive, so that the shutdown procedure will wait for us to
// finish the message processing before closing the JMS Session.
_processorInReceive = false;
/*
* NOTE: Important ordering between this 'processInReceive' logic, the shutdown procedure, and
* the checking of the run-flag just below: The shutdown procedure as the very first step sets
* the run-flag to false, and chooses whether to close the JMS Session based on the
* 'processInReceive' flag. If we get a race here, where the shutdown procedure sets the
* run-flag to false, and then closes the session (since it saw 'processInReceive=true'), but
* then we get a message from the receive() call nevertheless due to it winning a race, the
* checking of the run-flag below will catch this: The Session will be closed - ensuring that
* the message will be eventually re-delivered to another instance of this stage - and then
* we'll exit *before* processing the message. If the shutdown procedure sees
* 'processInReceive=false', it will *not* close the session, and instead let us fully finish
* the processing of the message (in a grace period), and then we'll see the 'runFlag=false',
* and thus close and exit *after* processing the message.
*/
}
long[] nanos_Received = { System.nanoTime() };
Instant[] instant_Received = { Instant.now() };

// Need to check whether the JMS Message gotten is null, as that signals that the
// Consumer, Session or Connection was closed from another thread.
if (message == null) {
// ?: Are we shut down?
if (!_runFlag) {
// -> Yes, down
log.info(LOG_PREFIX + "Got null from JMS consumer.receive(), and run-flag is false."
// ?: Are we shut down?
if (!_runFlag) {
// -> Yes, we're shut down
// ?: Did we also get a null message?
// (Shutdown is initiated "from the outside", where it first sets the runFlag to false, and then
// closes the JMS Session if 'processInReceive=true'. This shall lead to us getting a null
// message.)
if (message == null) {
// -> Yes, we got a null message.
log.info(LOG_PREFIX + "Run-flag is false, and we got null from JMS consumer.receive()."
+ " Breaking out of run-loop to exit.");
// Since this means that the session was closed "from the outside", we'll NOT close it
// from our side (here in the processor thread)
nullFromReceiveThusSessionIsClosed = true;
break OUTER;
}
else {
// -> No, not down: Something strange has happened.
log.warn(LOG_PREFIX + "!! Got null from JMS consumer.receive(), but run-flag is still"
+ " true. Closing current JmsSessionHolder to clean up. Looping to get new.");
closeCurrentSessionHolder();
continue OUTER;
// -> No, we got a message (not null!)
// (Since the shutdown is async, we can potentially be on the way out from
// consumer.receive() with a new message in hand, but not yet set 'processInReceive=false',
// and then the runFlag is set to false "from the outside", and the JMS Session is closed.)
log.info(LOG_PREFIX + "Run-flag is false, but we had just gotten a message from"
+ " consumer.receive(). Breaking out of run-loop to exit, and also close"
+ " current JmsSessionHolder to be sure about clean up. The message reception"
+ " will be rolled back, and the message will eventually be re-delivered to"
+ " another instance of this stage.");
// NOTE: We'll NOT set 'nullFromReceiveThusSessionIsClosed', as we want to ensure that the
// JMS Session is really closed. The closing (either from outside or inside) will implicitly
// roll back the reception, and the message will be re-delivered to another instance of this
// stage.
}
break OUTER;
}

// ?: Did we get a null message? (but runFlag was still true)
// (This should only happen on shutdown when the JMS Session is closed "from the outside", and
// where the runFlag has first been set to false - and should thus have been caught above!)
if (message == null) {
// -> Yes, we got a null message, while the runFlag evidently still was true (as otherwise
// it would have been handled in the above checking of runFlag).
// This should not happen, and probably indicates a bad JMS situation. We'll close the current
// JmsSessionHolder to clean up, and then loop to get a new fresh one.
log.warn(LOG_PREFIX + "!! Got null from JMS consumer.receive(), but run-flag is still"
+ " true. Closing current JmsSessionHolder to clean up. Looping to get new.");
closeCurrentSessionHolder();
continue OUTER;
}

// Fetch JMS Delivery Count, for interceptor context. This starts at 1 for first delivery.
Expand Down

0 comments on commit 354f43d

Please sign in to comment.