diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java index 30b6e34b44e..61398e27042 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java @@ -503,7 +503,8 @@ private void executeMessagesAndCollectResults() throws InterruptedException { boolean skipOffsetCommandHasBeenProcessed = false; for (Map.Entry> entry: storeAdminOperationsMapWithOffset.entrySet()) { if (!entry.getValue().isEmpty()) { - if (checkOffsetToSkip(entry.getValue().peek().getOffset(), false)) { + long adminMessageOffset = entry.getValue().peek().getOffset(); + if (checkOffsetToSkip(adminMessageOffset, false)) { entry.getValue().remove(); skipOffsetCommandHasBeenProcessed = true; } @@ -526,7 +527,7 @@ private void executeMessagesAndCollectResults() throws InterruptedException { LOGGER.info( "Adding admin message from store {} with offset {} to the task list", entry.getKey(), - entry.getValue().peek().getOffset()); + adminMessageOffset); this.tasks.add(newTask); stores.add(entry.getKey()); }