diff --git a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java index bda419eddca5..b9435f1ad6df 100644 --- a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java +++ b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.memory; +import java.util.Optional; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Utils; @@ -42,7 +43,7 @@ public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements Auto private volatile boolean alive = true; public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) { - super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor, null); + super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor, null, Optional.empty()); this.alive = true; this.gcListenerThread = new Thread(gcListener, "memory pool GC listener"); this.gcListenerThread.setDaemon(true); //so we dont need to worry about shutdown diff --git a/clients/src/main/java/org/apache/kafka/common/memory/MemoryPoolStatsStore.java b/clients/src/main/java/org/apache/kafka/common/memory/MemoryPoolStatsStore.java new file mode 100644 index 000000000000..5f9203a62395 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/memory/MemoryPoolStatsStore.java @@ -0,0 +1,80 @@ +package org.apache.kafka.common.memory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MemoryPoolStatsStore { + private static final Logger log = LoggerFactory.getLogger(MemoryPoolStatsStore.class); + + private final AtomicInteger[] histogram; + private final int maxSizeBytes; + private final int segmentSizeBytes; + + public static class Range { + public final int startInclusive; + public final int endInclusive; + + public Range(int startInclusive, int endInclusive) { + this.startInclusive = startInclusive; + this.endInclusive = endInclusive; + } + + @Override + public String toString() { + return "Range{" + "startInclusive=" + startInclusive + ", endInclusive=" + endInclusive + '}'; + } + } + + public MemoryPoolStatsStore(int segments, int maxSizeBytes) { + histogram = new AtomicInteger[segments]; + this.maxSizeBytes = maxSizeBytes; + segmentSizeBytes = (int) Math.ceil((double) maxSizeBytes / segments); + for (int segmentIndex = 0; segmentIndex < segments; segmentIndex++) { + histogram[segmentIndex] = new AtomicInteger(); + } + } + + private int getSegmentIndexForBytes(int bytes) { + if (bytes == 0) { + throw new IllegalArgumentException("Requested zero bytes for allocation."); + } + if (bytes > maxSizeBytes) { + log.debug("Requested bytes {} for allocation exceeds maximum recorded value {}", bytes, maxSizeBytes); + return -1; + } else { + return (bytes - 1) / segmentSizeBytes; + } + } + + public void recordAllocation(int bytes) { + try { + final int segmentIndex = getSegmentIndexForBytes(bytes); + if (segmentIndex != -1) { + histogram[segmentIndex].incrementAndGet(); + } + } catch (IllegalArgumentException e) { + log.error("Encountered error when trying to record memory allocation for request", e); + } + } + + public synchronized Map getFrequencies() { + Map frequenciesMap = new HashMap<>(); + for (int segmentIndex = 0; segmentIndex < histogram.length; segmentIndex++) { + frequenciesMap.put(new Range( + segmentIndex * segmentSizeBytes + 1, + segmentIndex * segmentSizeBytes + segmentSizeBytes + ), histogram[segmentIndex].intValue()); + } + return frequenciesMap; + } + + public synchronized void clear() { + for (AtomicInteger atomicInteger : histogram) { + atomicInteger.set(0); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java index f6f20808cfff..5f47c4429dd3 100644 --- a/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java +++ b/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.memory; import java.nio.ByteBuffer; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.common.metrics.Sensor; @@ -37,10 +38,13 @@ public class SimpleMemoryPool implements MemoryPool { protected final AtomicLong availableMemory; protected final int maxSingleAllocationSize; protected final AtomicLong startOfNoMemPeriod = new AtomicLong(); //nanoseconds + private final Optional memoryPoolStatsStore; protected volatile Sensor oomTimeSensor; protected volatile Sensor allocateSensor; - public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor, Sensor allocateSensor) { + public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor, + Sensor allocateSensor, Optional memoryPoolStatsStore) { + this.memoryPoolStatsStore = memoryPoolStatsStore; if (sizeInBytes <= 0 || maxSingleAllocationBytes <= 0 || maxSingleAllocationBytes > sizeInBytes) throw new IllegalArgumentException("must provide a positive size and max single allocation size smaller than size." + "provided " + sizeInBytes + " and " + maxSingleAllocationBytes + " respectively"); @@ -57,7 +61,8 @@ public ByteBuffer tryAllocate(int sizeBytes) { if (sizeBytes < 1) throw new IllegalArgumentException("requested size " + sizeBytes + "<=0"); if (sizeBytes > maxSingleAllocationSize) - throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize); + throw new IllegalArgumentException( + "requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize); long available; boolean success = false; @@ -114,6 +119,7 @@ public boolean isOutOfMemory() { //allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code. protected void bufferToBeReturned(ByteBuffer justAllocated) { this.allocateSensor.record(justAllocated.capacity()); + memoryPoolStatsStore.ifPresent(sizeStore -> sizeStore.recordAllocation(justAllocated.capacity())); log.trace("allocated buffer of size {} ", justAllocated.capacity()); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index e29ab6ad37c4..5a59a7b4f5cc 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -692,7 +692,7 @@ public void testPartialReceiveGracefulClose() throws Exception { public void testMuteOnOOM() throws Exception { //clean up default selector, replace it with one that uses a finite mem pool selector.close(); - MemoryPool pool = new SimpleMemoryPool(900, 900, false, null, sensor); + MemoryPool pool = new SimpleMemoryPool(900, 900, false, null, sensor, Optional.empty()); selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup", new HashMap(), true, false, channelBuilder, pool, new LogContext()); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index b1220de582b4..6c9581c7ce91 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.network; import java.nio.channels.SelectionKey; +import java.util.Optional; import javax.net.ssl.SSLEngine; import org.apache.kafka.common.config.SecurityConfig; @@ -288,7 +289,7 @@ public void testRenegotiationFails() throws Exception { public void testMuteOnOOM() throws Exception { //clean up default selector, replace it with one that uses a finite mem pool selector.close(); - MemoryPool pool = new SimpleMemoryPool(900, 900, false, null, sensor); + MemoryPool pool = new SimpleMemoryPool(900, 900, false, null, sensor, Optional.empty()); //the initial channel builder is for clients, we need a server one String tlsProtocol = "TLSv1.2"; File trustStoreFile = File.createTempFile("truststore", ".jks"); diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c8133d613b73..8a2dd887662b 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -38,7 +38,7 @@ import kafka.utils._ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.internals.QuotaConfigs import org.apache.kafka.common.errors.InvalidRequestException -import org.apache.kafka.common.memory.{MemoryPool, RecyclingMemoryPool, SimpleMemoryPool} +import org.apache.kafka.common.memory.{MemoryPool, RecyclingMemoryPool, MemoryPoolStatsStore, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Max, Meter, Percentile, Percentiles, Rate} @@ -79,9 +79,19 @@ class SocketServer(val config: KafkaConfig, val time: Time, val credentialProvider: CredentialProvider, val observer: Observer, - val apiVersionManager: ApiVersionManager) + val apiVersionManager: ApiVersionManager, + val memoryPoolStatsStore: Optional[MemoryPoolStatsStore]) extends Logging with KafkaMetricsGroup with BrokerReconfigurable { + def this(config: KafkaConfig, + metrics: Metrics, + time: Time, + credentialProvider: CredentialProvider, + observer: Observer, + apiVersionManager: ApiVersionManager) { + this(config, metrics, time, credentialProvider, observer, apiVersionManager, Optional.empty()) + } + private val maxQueuedRequests = config.queuedMaxRequests private val nodeId = config.brokerId @@ -100,7 +110,7 @@ class SocketServer(val config: KafkaConfig, private val percentiles = (1 to 9).map( i => new Percentile(metrics.metricName("MemoryPoolAllocateSize%dPercentile".format(i * 10), MetricsGroup), i * 10)) // At current stage, we do not know the max decrypted request size, temporarily set it to 10MB. memoryPoolAllocationSensor.add(new Percentiles(400, 0.0, 10485760, BucketSizing.CONSTANT, percentiles:_*)) - private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolUsageSensor, memoryPoolAllocationSensor) + private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolUsageSensor, memoryPoolAllocationSensor, memoryPoolStatsStore) else if (config.socketRequestCommonBytes > 0) new RecyclingMemoryPool(config.socketRequestCommonBytes, config.socketRequestBufferCacheSize, memoryPoolAllocationSensor) else MemoryPool.NONE // data-plane diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 591b031c3346..929bb9b75310 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -84,6 +84,11 @@ object Defaults { val AllowPreferredControllerFallback = true val MissingPerTopicConfig = "-1" + val MemoryPoolStatsLoggingEnable = false + val MemoryPoolStatsMaxSize = Integer.MAX_VALUE + val MemoryPoolStatsNumSegments = 1000 + val MemoryPoolStatsLoggingFrequencyMinutes = 60 + val UnofficialClientLoggingEnable = false val UnofficialClientCacheTtl = 1 val ExpectedClientSoftwareNames = util.Arrays.asList( @@ -441,6 +446,10 @@ object KafkaConfig { val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable" val UnofficialClientCacheTtlProp = "unofficial.client.cache.ttl" val ExpectedClientSoftwareNamesProp = "expected.client.software.names" + val MemoryPoolStatsLoggingEnableProp = "memory.pool.stats.logging.enable" + val MemoryPoolStatsMaxSizeProp = "memory.pool.stats.max.size" + val MemoryPoolStatsNumSegmentsProp = "memory.pool.stats.num.segments" + val MemoryPoolStatsLoggingFrequencyMinutesProp = "memory.pool.stats.logging.frequency.minutes" /************* Authorizer Configuration ***********/ val AuthorizerClassNameProp = "authorizer.class.name" @@ -780,6 +789,10 @@ object KafkaConfig { val UnofficialClientLoggingEnableDoc = "Controls whether logging occurs when an ApiVersionsRequest is received from a client unsupported by LinkedIn, such as an Apache Kafka client." val UnofficialClientCacheTtlDoc = "The amount of time (in hours) for the identity of an unofficial client to live in the local cache to avoid duplicate log messages." val ExpectedClientSoftwareNamesDoc = "The software names of clients that are supported by LinkedIn, such as Avro, Raw, and Tracking clients." + val MemoryPoolStatsLoggingEnableDoc = "Specifies whether memory pool statistics should be logged." + val MemoryPoolStatsMaxSizeDoc = "Maximum size of memory allocation which will be recorded if memory pool statistics logging is enabled." + val MemoryPoolStatsNumSegmentsDoc = "The number of segments into which the memory pool statistics histogram will be broken." + val MemoryPoolStatsLoggingFrequencyMinutesDoc = "The frequency in minutes at which memory pool statistics will be recorded." /************* Authorizer Configuration ***********/ val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" + @@ -1214,6 +1227,10 @@ object KafkaConfig { .define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc) .define(UnofficialClientCacheTtlProp, LONG, Defaults.UnofficialClientCacheTtl, LOW, UnofficialClientCacheTtlDoc) .define(ExpectedClientSoftwareNamesProp, LIST, Defaults.ExpectedClientSoftwareNames, LOW, ExpectedClientSoftwareNamesDoc) + .define(MemoryPoolStatsLoggingEnableProp, BOOLEAN, Defaults.MemoryPoolStatsLoggingEnable, LOW, MemoryPoolStatsLoggingEnableDoc) + .define(MemoryPoolStatsMaxSizeProp, INT, Defaults.MemoryPoolStatsMaxSize, LOW, MemoryPoolStatsMaxSizeDoc) + .define(MemoryPoolStatsNumSegmentsProp, INT, Defaults.MemoryPoolStatsNumSegments, LOW, MemoryPoolStatsNumSegmentsDoc) + .define(MemoryPoolStatsLoggingFrequencyMinutesProp, INT, Defaults.MemoryPoolStatsLoggingFrequencyMinutes, LOW, MemoryPoolStatsLoggingFrequencyMinutesDoc) /************* Authorizer Configuration ***********/ .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) @@ -1723,6 +1740,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def unofficialClientCacheTtl = getLong(KafkaConfig.UnofficialClientCacheTtlProp) def expectedClientSoftwareNames = getList(KafkaConfig.ExpectedClientSoftwareNamesProp) + def memoryPoolStatsLoggingEnable = getBoolean(KafkaConfig.MemoryPoolStatsLoggingEnableProp) + def memoryPoolStatsMaxSize = getInt(KafkaConfig.MemoryPoolStatsMaxSizeProp) + def memoryPoolStatsNumSegments = getInt(KafkaConfig.MemoryPoolStatsNumSegmentsProp) + def memoryPoolStatsLoggingFrequencyMinutes = getInt(KafkaConfig.MemoryPoolStatsLoggingFrequencyMinutesProp) + def getNumReplicaAlterLogDirsThreads: Int = { val numThreads: Integer = Option(getInt(KafkaConfig.NumReplicaAlterLogDirsThreadsProp)).getOrElse(logDirs.size) numThreads diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 9357d67be200..6e1a5696390e 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -37,6 +37,7 @@ import kafka.utils._ import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient} import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.memory.MemoryPoolStatsStore import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ControlledShutdownRequestData import org.apache.kafka.common.metrics.Metrics @@ -46,13 +47,14 @@ import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledSh import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} -import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils, PoisonPill} +import org.apache.kafka.common.utils.{AppInfoParser, LogContext, PoisonPill, Time, Utils} import org.apache.kafka.common.{Endpoint, Node, TopicPartition} import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.zookeeper.client.ZKClientConfig +import java.util.Optional import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ import scala.collection.mutable.{ArrayBuffer, Buffer} @@ -323,13 +325,43 @@ class KafkaServer( observer = Observer(config) + def initializeMemoryPoolStats(): Optional[MemoryPoolStatsStore] = { + if (!config.memoryPoolStatsLoggingEnable) { + return Optional.empty() + } + + info(s"Memory pool stats logging is enabled, segments = " + + s"${config.memoryPoolStatsNumSegments}, max size = ${config.memoryPoolStatsMaxSize}, " + + s"logging frequency in minutes = ${config.memoryPoolStatsLoggingFrequencyMinutes}") + val memoryPoolStatsStore = new MemoryPoolStatsStore(config.memoryPoolStatsNumSegments, config.memoryPoolStatsMaxSize) + val requestStatsLogger = new MemoryPoolStatsLogger() + + def publishHistogramToLog(): Unit = { + info("Publishing memory pool stats") + requestStatsLogger.logStats(memoryPoolStatsStore) + memoryPoolStatsStore.clear() + } + + val histogramPublisher = new KafkaScheduler(threads = 1, "histogram-publisher-") + histogramPublisher.startup() + histogramPublisher.schedule(name = "publish-histogram-to-log", + fun = publishHistogramToLog, + period = config.memoryPoolStatsLoggingFrequencyMinutes.toLong, + unit = TimeUnit.MINUTES) + + Optional.of(memoryPoolStatsStore) + } + + val memoryPoolStatsStore = initializeMemoryPoolStats() + // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. // // Note that we allow the use of KRaft mode controller APIs when forwarding is enabled // so that the Envelope request is exposed. This is only used in testing currently. - socketServer = new SocketServer(config, metrics, time, credentialProvider, observer, apiVersionManager) + socketServer = new SocketServer( + config, metrics, time, credentialProvider, observer, apiVersionManager, memoryPoolStatsStore) socketServer.startup(startProcessingRequests = false) /* start replica manager */ diff --git a/core/src/main/scala/kafka/server/MemoryPoolStatsLogger.scala b/core/src/main/scala/kafka/server/MemoryPoolStatsLogger.scala new file mode 100644 index 000000000000..035e2f5434d5 --- /dev/null +++ b/core/src/main/scala/kafka/server/MemoryPoolStatsLogger.scala @@ -0,0 +1,23 @@ +package kafka.server + +import com.typesafe.scalalogging.Logger +import kafka.utils.Logging +import org.apache.kafka.common.memory.MemoryPoolStatsStore + +import scala.collection.JavaConverters._ + +object MemoryPoolStatsLogger { + private val logger = Logger("memory.pool.stats.logger") +} + +class MemoryPoolStatsLogger extends Logging { + override lazy val logger = MemoryPoolStatsLogger.logger + + def logStats(memoryPoolStatsStore: MemoryPoolStatsStore): Unit = { + val frequencyList = memoryPoolStatsStore.getFrequencies.asScala.toSeq.sortBy(_._1.startInclusive) + frequencyList.foreach { + case (range, frequency) => + info(s"[${range.startInclusive}-${range.endInclusive}] = $frequency") + } + } +}