From 78de20c7014f8dc1b86e3e1bb0d165fcb0e06911 Mon Sep 17 00:00:00 2001 From: Min Huang Date: Tue, 12 Nov 2024 13:00:50 -0800 Subject: [PATCH] [controller] Fixed a bug that logger is trying to access a removed map entry (#1298) The top store admin operation entry is already removed before the logger trying to access the top entry. This PR fixes this bug by storing the offset of top entry before hand. --- .../controller/kafka/consumer/AdminConsumptionTask.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java index 30b6e34b44e..61398e27042 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java @@ -503,7 +503,8 @@ private void executeMessagesAndCollectResults() throws InterruptedException { boolean skipOffsetCommandHasBeenProcessed = false; for (Map.Entry> entry: storeAdminOperationsMapWithOffset.entrySet()) { if (!entry.getValue().isEmpty()) { - if (checkOffsetToSkip(entry.getValue().peek().getOffset(), false)) { + long adminMessageOffset = entry.getValue().peek().getOffset(); + if (checkOffsetToSkip(adminMessageOffset, false)) { entry.getValue().remove(); skipOffsetCommandHasBeenProcessed = true; } @@ -526,7 +527,7 @@ private void executeMessagesAndCollectResults() throws InterruptedException { LOGGER.info( "Adding admin message from store {} with offset {} to the task list", entry.getKey(), - entry.getValue().peek().getOffset()); + adminMessageOffset); this.tasks.add(newTask); stores.add(entry.getKey()); }