diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java index de29cbc94c295..28a46d5f4d061 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java @@ -135,6 +135,16 @@ private List>> functionAssignmentsDecode(String j return retVal; } + private String getTopicStats(String name) throws Exception { + ContainerExecResult result = pulsarCluster.getProxy().execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "stats", + name + ); + return result.getStdout(); + } + private List getClusterStatus() throws Exception { val result = pulsarCluster.getAnyWorker().execCmd( PulsarCluster.ADMIN_SCRIPT, @@ -311,6 +321,7 @@ private void testRebalance() throws Exception { WorkerInfo oldClusterLeaderInfo = getClusterLeader(); log.info("Cluster leader before adding more workers is: {}", oldClusterLeaderInfo); + log.debug("Stats for the Coordinate Topic: \n{}", getTopicStats("persistent://public/functions/coordinate")); List>> startFinfos = getFunctionAssignments(); int startFuncCount = getFuncAssignmentsCount(startFinfos); @@ -334,6 +345,7 @@ private void testRebalance() throws Exception { WorkerInfo newClusterLeaderInfo = getClusterLeader(); log.info("Cluster leader after adding {} workers is: {}", numWorkersToAdd, newClusterLeaderInfo); + log.debug("Stats for the Coordinate Topic: \n{}", getTopicStats("persistent://public/functions/coordinate")); // Leadership should not have changed. assertTrue(oldClusterLeaderInfo.getWorkerId().compareTo(newClusterLeaderInfo.getWorkerId()) == 0);