From 03c970a7a6f84444a972f9ea8cf42fd06b687722 Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Tue, 26 Nov 2024 10:34:54 -0800 Subject: [PATCH 1/3] Make disk quota violation more informative. --- .../AbstractKafkaConsumerService.java | 2 +- .../consumer/AggKafkaConsumerService.java | 2 +- .../kafka/consumer/KafkaConsumerService.java | 24 +++++++++++++++---- .../KafkaConsumerServiceDelegator.java | 4 ++-- .../consumer/StorageUtilizationManager.java | 11 +++++---- .../consumer/TopicPartitionIngestionInfo.java | 7 ++++++ .../consumer/KafkaConsumerServiceTest.java | 2 +- .../KafkaStoreIngestionServiceTest.java | 2 +- 8 files changed, 39 insertions(+), 15 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java index bf6c68564ec..95c03f3ac0b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java @@ -44,7 +44,7 @@ public abstract long getLatestOffsetBasedOnMetrics( PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition); - public abstract Map getIngestionInfoFromConsumer( + public abstract Map getIngestionInfoFor( PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java index 9c80e6674e3..d75c1601fd4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java @@ -484,7 +484,7 @@ byte[] getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSub for (String kafkaUrl: kafkaServerToConsumerServiceMap.keySet()) { AbstractKafkaConsumerService consumerService = getKafkaConsumerService(kafkaUrl); Map topicPartitionIngestionInfoMap = - consumerService.getIngestionInfoFromConsumer(versionTopic, pubSubTopicPartition); + consumerService.getIngestionInfoFor(versionTopic, pubSubTopicPartition); for (Map.Entry entry: topicPartitionIngestionInfoMap .entrySet()) { PubSubTopicPartition topicPartition = entry.getKey(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index f5f654a0625..285ae3ee4d1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -395,15 +395,24 @@ public long getMaxElapsedTimeMSSinceLastPollInConsumerPool() { * are zero-based and ConsumptionTasks are submitted to the executor in order. */ Thread slowestThread = threadFactory.getThread(slowestTaskId); - + SharedKafkaConsumer consumer = consumerToConsumptionTask.getByIndex(slowestTaskId).getKey(); + Map topicPartitionIngestionInfoMap = + getIngestionInfoFromConsumer(consumer); + // Convert Map of ingestion info for this consumer to String for logging with each partition line by line + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry: topicPartitionIngestionInfoMap + .entrySet()) { + sb.append(entry.getKey().toString()).append(": ").append(entry.getValue().toString()).append("\n"); + } // log the slowest consumer id if it couldn't make any progress in a minute! LOGGER.warn( - "Shared consumer ({} - task {}) couldn't make any progress for over {} ms, thread name: {}, stack trace:\n{}", + "Shared consumer ({} - task {}) couldn't make any progress for over {} ms, thread name: {}, stack trace:\n{}, consumer info:\n{}", kafkaUrl, slowestTaskId, maxElapsedTimeSinceLastPollInConsumerPool, slowestThread != null ? slowestThread.getName() : null, - ExceptionUtils.threadToThrowableToString(slowestThread)); + ExceptionUtils.threadToThrowableToString(slowestThread), + sb.toString()); } } return maxElapsedTimeSinceLastPollInConsumerPool; @@ -530,10 +539,17 @@ private long getSomeOffsetFor( } } - public Map getIngestionInfoFromConsumer( + public Map getIngestionInfoFor( PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { SharedKafkaConsumer consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition); + Map topicPartitionIngestionInfoMap = + getIngestionInfoFromConsumer(consumer); + return topicPartitionIngestionInfoMap; + } + + private Map getIngestionInfoFromConsumer( + SharedKafkaConsumer consumer) { Map topicPartitionIngestionInfoMap = new HashMap<>(); if (consumer != null) { ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java index d018a000a00..c93ba29d81c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java @@ -219,12 +219,12 @@ public long getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicP } @Override - public Map getIngestionInfoFromConsumer( + public Map getIngestionInfoFor( PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(versionTopic, pubSubTopicPartition); if (kafkaConsumerService != null) { - return kafkaConsumerService.getIngestionInfoFromConsumer(versionTopic, pubSubTopicPartition); + return kafkaConsumerService.getIngestionInfoFor(versionTopic, pubSubTopicPartition); } else { LOGGER.warn( "No consumer service found for version topic {} and partition {} when fetching ingestion info" diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.java index 9cb38de4467..5fe89294e9f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.java @@ -281,14 +281,15 @@ private void enforcePartitionQuota(int partition, PartitionConsumptionState pcs, boolean shouldLogQuotaExceeded = !REDUNDANT_LOGGING_FILTER.isRedundantException(msgIdentifier); if (shouldLogQuotaExceeded) { LOGGER.info( - "Quota exceeded for store {} partition {}, paused this partition. {}", - storeName, + "Quota exceeded for store version {} partition {}, paused this partition. Partition disk usage: {} >= partition quota: {}", + versionTopic, partition, - versionTopic); + storagePartitionDiskUsage.getUsage(), + diskQuotaPerPartition); } - } else { /** we have free space for this partition */ + } else { /** - * Paused partitions could be resumed + * We have free space for this partition, paused partitions could be resumed */ ingestionNotificationDispatcher.reportQuotaNotViolated(pcs); if (isPartitionPausedIngestion(partition)) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/TopicPartitionIngestionInfo.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/TopicPartitionIngestionInfo.java index 6e3cfdc4efd..83ee8e069ad 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/TopicPartitionIngestionInfo.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/TopicPartitionIngestionInfo.java @@ -113,4 +113,11 @@ public int hashCode() { result = 31 * result + versionTopicName.hashCode(); return result; } + + @Override + public String toString() { + return "{" + "latestOffset:" + latestOffset + ", offsetLag:" + offsetLag + ", msgRate:" + msgRate + ", byteRate:" + + byteRate + ", consumerIdStr:" + consumerIdStr + ", elapsedTimeSinceLastPollInMs:" + + elapsedTimeSinceLastPollInMs + ", versionTopicName:" + versionTopicName + '}'; + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java index 897dc96eaa9..5cc2a4ae52c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java @@ -205,7 +205,7 @@ public void testGetTopicPartitionIngestionInformation() throws Exception { TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.SECONDS, true, true, () -> { verify(consumer1, atLeastOnce()).poll(anyLong()); Map topicPartitionIngestionInfoMap = - consumerService.getIngestionInfoFromConsumer(versionTopic, topicPartition); + consumerService.getIngestionInfoFor(versionTopic, topicPartition); Assert.assertEquals(topicPartitionIngestionInfoMap.size(), 1); Assert.assertTrue(topicPartitionIngestionInfoMap.containsKey(topicPartition)); Assert.assertTrue(topicPartitionIngestionInfoMap.get(topicPartition).getConsumerIdStr().contains("0")); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 0eb6a644138..f8eb64f5141 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -460,7 +460,7 @@ public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngest AbstractKafkaConsumerService kafkaConsumerService = spy(storeIngestionTask.aggKafkaConsumerService.createKafkaConsumerService(consumerProperties)); kafkaStoreIngestionService.getTopicPartitionIngestionContext(topicName, topicName, 0); - verify(kafkaConsumerService, atMostOnce()).getIngestionInfoFromConsumer(pubSubTopic, pubSubTopicPartition); + verify(kafkaConsumerService, atMostOnce()).getIngestionInfoFor(pubSubTopic, pubSubTopicPartition); } @Test From f1d938c2a9229d385d1f9bbda1ad5417024eff05 Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Thu, 5 Dec 2024 15:49:48 -0800 Subject: [PATCH 2/3] Change logging frequency from 1 min to 10 mins. --- .../linkedin/davinci/kafka/consumer/KafkaConsumerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index 285ae3ee4d1..7706c2e6d0b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -89,7 +89,7 @@ public abstract class KafkaConsumerService extends AbstractKafkaConsumerService private final ExecutorService consumerExecutor; private static final int SHUTDOWN_TIMEOUT_IN_SECOND = 1; private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = - RedundantExceptionFilter.getRedundantExceptionFilter(); + new RedundantExceptionFilter(RedundantExceptionFilter.DEFAULT_BITSET_SIZE, TimeUnit.MINUTES.toMillis(10)); /** * @param statsOverride injection of stats, for test purposes From 169f21c5c272ce7272694f0b9be8df650b8a984d Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Thu, 5 Dec 2024 17:35:11 -0800 Subject: [PATCH 3/3] Adjust bitmap size to 8MB in total --- .../linkedin/davinci/kafka/consumer/KafkaConsumerService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index 7706c2e6d0b..3b577442a3d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -88,8 +88,9 @@ public abstract class KafkaConsumerService extends AbstractKafkaConsumerService private final Logger LOGGER; private final ExecutorService consumerExecutor; private static final int SHUTDOWN_TIMEOUT_IN_SECOND = 1; + // 4MB bitset size, 2 bitmaps for active and old bitset private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = - new RedundantExceptionFilter(RedundantExceptionFilter.DEFAULT_BITSET_SIZE, TimeUnit.MINUTES.toMillis(10)); + new RedundantExceptionFilter(8 * 1024 * 1024 * 4, TimeUnit.MINUTES.toMillis(10)); /** * @param statsOverride injection of stats, for test purposes