From 8d165803e948c770ff87d806fd54b94b7b6c8632 Mon Sep 17 00:00:00 2001 From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> Date: Wed, 13 Dec 2023 19:54:03 -0800 Subject: [PATCH] [fix][broker] Fixed ServiceUnitStateChannel monitor to tombstone only inactive bundle states (#21721) --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 2 +- .../extensions/channel/ServiceUnitStateChannelTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 6c49283745bcb..713d98b72507e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1446,7 +1446,7 @@ protected void monitorOwnerships(List brokers) { continue; } - if (now - stateData.timestamp() > stateTombstoneDelayTimeInMillis) { + if (!isActiveState(state) && now - stateData.timestamp() > stateTombstoneDelayTimeInMillis) { log.info("Found semi-terminal states to tombstone" + " serviceUnit:{}, stateData:{}", serviceUnit, stateData); tombstoneAsync(serviceUnit).whenComplete((__, e) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 920a13cc87963..f99a167ff4883 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -639,7 +639,7 @@ public void splitAndRetryTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; validateMonitorCounters(leader, 0, - 3, + 1, 0, 0, 0, @@ -1409,7 +1409,7 @@ public void splitAndRetryFailureTest() throws Exception { validateMonitorCounters(leader, 0, - 3, + 1, 1, 0, 0,