Skip to content

Commit

Permalink
Fix race condition in the AbstractMessageListenerContainer lifecycle
Browse files Browse the repository at this point in the history
The `stop()` and `childStopped()` in the `ConcurrentMessageListenerContainer` use the same `lifecycleLock`,
however the last one is called from the thread of listener consumer in the child application context.

* Fix `AbstractMessageListenerContainer.stop(wait)` logic to release the `lifecycleLock`
before going to the `latch.await()`.
This still blocks the `stop()` call, but allows the other lifecycle conditions to be fulfilled,
even from different threads
  • Loading branch information
artembilan committed Sep 16, 2024
1 parent c6cf896 commit 1c16075
Showing 1 changed file with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
*/
public abstract class AbstractMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
ApplicationContextAware {
ApplicationContextAware {

/**
* The default {@link org.springframework.context.SmartLifecycle} phase for listener
Expand Down Expand Up @@ -143,7 +143,6 @@ public abstract class AbstractMessageListenerContainer<K, V>
@Nullable
private KafkaAdmin kafkaAdmin;


/**
* Construct an instance with the provided factory and properties.
* @param consumerFactory the factory.
Expand Down Expand Up @@ -609,28 +608,36 @@ public final void stop() {
* @since 2.3.8
*/
public final void stop(boolean wait) {
this.lifecycleLock.lock();
try {
if (isRunning()) {
if (wait) {
final CountDownLatch latch = new CountDownLatch(1);
if (isRunning()) {
if (wait) {
final CountDownLatch latch = new CountDownLatch(1);
this.lifecycleLock.lock();
try {

doStop(latch::countDown);
try {
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR
publishContainerStoppedEvent();
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
}
else {
finally {
this.lifecycleLock.unlock();
}
try {
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR
publishContainerStoppedEvent();
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
}
else {
this.lifecycleLock.lock();
try {
doStop(this::publishContainerStoppedEvent);
}
finally {
this.lifecycleLock.unlock();
}

}
}
finally {
this.lifecycleLock.unlock();
}
}

@Override
Expand Down Expand Up @@ -706,7 +713,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
AbstractMessageListenerContainer.this.logger.info(() ->
getGroupId() + ": partitions lost: " + partitions);
getGroupId() + ": partitions lost: " + partitions);
}

};
Expand Down

0 comments on commit 1c16075

Please sign in to comment.