From 700b20e798fa30e43dfb4bde84510be767376919 Mon Sep 17 00:00:00 2001 From: Nick Garvey Date: Tue, 5 Dec 2023 20:47:01 +0000 Subject: [PATCH] Add QuotaV2Handler into produce path --- .../scala/kafka/server/BrokerServer.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 11 ++- .../main/scala/kafka/server/KafkaConfig.scala | 19 +++++ .../main/scala/kafka/server/KafkaServer.scala | 10 ++- .../kafka/server/NoOpQuotaV2Handler.scala | 33 +++++++ .../scala/kafka/server/QuotaV2Handler.scala | 76 +++++++++++++++++ .../unit/kafka/server/KafkaApisTest.scala | 85 ++++++++++++++++++- .../unit/kafka/server/KafkaConfigTest.scala | 4 + .../kafka/server/QuotaV2HandlerTest.scala | 33 +++++++ .../metadata/MetadataRequestBenchmark.java | 3 + 10 files changed, 272 insertions(+), 6 deletions(-) create mode 100644 core/src/main/scala/kafka/server/NoOpQuotaV2Handler.scala create mode 100644 core/src/main/scala/kafka/server/QuotaV2Handler.scala create mode 100755 core/src/test/scala/unit/kafka/server/QuotaV2HandlerTest.scala diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 01f18cdcef005..415605ab968b3 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -108,6 +108,7 @@ class BrokerServer( var authorizer: Option[Authorizer] = None var observer: Observer = null + var quotaV2Handler: QuotaV2Handler = null var socketServer: SocketServer = null var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null @@ -237,6 +238,7 @@ class BrokerServer( ) observer = Observer(config) + quotaV2Handler = QuotaV2Handler(config) // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure @@ -389,7 +391,7 @@ class BrokerServer( val raftSupport = RaftSupport(forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport, replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, - config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, observer, quotaManagers, + config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, observer, quotaV2Handler, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a47065a2e31dd..8bc111052895e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -107,6 +107,7 @@ class KafkaApis(val requestChannel: RequestChannel, val metrics: Metrics, val authorizer: Option[Authorizer], val observer: Observer, + val quotaV2Handler: QuotaV2Handler, val quotas: QuotaManagers, val fetchManager: FetchManager, brokerTopicStats: BrokerTopicStats, @@ -612,6 +613,7 @@ class KafkaApis(val requestChannel: RequestChannel, val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() + val throttledTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() // cache the result to avoid redundant authorization calls val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, @@ -629,6 +631,8 @@ class KafkaApis(val requestChannel: RequestChannel, unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition)) nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) + else if (quotaV2Handler.checkLimit(topicPartition, produceRequest) == QuotaV2Decision.DENY) + throttledTopicResponses += topicPartition -> new PartitionResponse(Errors.THROTTLING_QUOTA_EXCEEDED) else try { ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords) @@ -645,7 +649,8 @@ class KafkaApis(val requestChannel: RequestChannel, // https://issues.apache.org/jira/browse/KAFKA-10730 @nowarn("cat=deprecation") def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { - val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses + val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ + invalidRequestResponses ++ throttledTopicResponses var errorInResponse = false instrumentation.markStage(Stage.BeginResponseCallback) instrumentation.appliedTopicPartitions = responseStatus.keys // logging relies on the info @@ -658,6 +663,9 @@ class KafkaApis(val requestChannel: RequestChannel, request.header.clientId, topicPartition, status.error.exceptionName)) + } else { + // Record the charge for the QuotaV2Handler + quotaV2Handler.recordCharge(topicPartition, produceRequest, status) } } @@ -687,6 +695,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.throttle(quotas.produce, request, effectiveBandWidthThrottleTime) requestHelper.throttle(quotas.request, request, effectiveRequestThrottleTime) + // Send the response immediately. In case of throttling, the channel has already been muted. if (produceRequest.acks == 0) { // We intentionally don't instrument acks=0 requests, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index eeea8cf750bd8..88b28872265c9 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -103,6 +103,8 @@ object Defaults { /** ********* Broker-side configuration ***********/ val ObserverClassName = "kafka.server.NoOpObserver" val ObserverShutdownTimeoutMs = 2000 + val QuotaV2HandlerClassName = "kafka.server.NoOpQuotaV2Handler" + val QuotaV2HandlerShutdownTimeoutMs = 2000 /** ********* Socket Server Configuration ***********/ val Listeners = "PLAINTEXT://:9092" @@ -479,6 +481,10 @@ object KafkaConfig { val ObserverClassNameProp = "observer.class.name" val ObserverShutdownTimeoutMsProp = "observer.shutdown.timeout" + /** ********* Broker-side quotav2handler Configuration *************** */ + val QuotaV2HandlerClassNameProp = "quotav2handler.class.name" + val QuotaV2HandlerShutdownTimeoutMsProp = "quotav2handler.shutdown.timeout" + /** ********* Socket Server Configuration ***********/ val ListenersProp = "listeners" val AdvertisedListenersProp = "advertised.listeners" @@ -1189,6 +1195,11 @@ object KafkaConfig { "zero. When closing/shutting down an observer, most time is spent on flushing the observed stats. The reasonable timeout should be close to " + "the time it takes to flush the stats." + /** ********* Broker-side Observer Configuration ******** */ + val QuotaV2HandlerClassNameDoc = "The name of the QuotaV2Handler class that is used to approve/deny produce requests." + val QuotaV2HandlerShutdownTimeoutMsDoc = "The maximum time of closing/shutting down a QuotaV2Handler. " + + "This property can not be less than or equal to zero." + private[server] val configDef = { import ConfigDef.Importance._ import ConfigDef.Range._ @@ -1288,6 +1299,10 @@ object KafkaConfig { .define(ObserverClassNameProp, STRING, Defaults.ObserverClassName, MEDIUM, ObserverClassNameDoc) .define(ObserverShutdownTimeoutMsProp, LONG, Defaults.ObserverShutdownTimeoutMs, atLeast(1), MEDIUM, ObserverShutdownTimeoutMsDoc) + /************* Broker-side Observer Configuration ***********/ + .define(QuotaV2HandlerClassNameProp, STRING, Defaults.QuotaV2HandlerClassName, MEDIUM, QuotaV2HandlerClassNameDoc) + .define(QuotaV2HandlerShutdownTimeoutMsProp, LONG, Defaults.QuotaV2HandlerShutdownTimeoutMs, atLeast(1), MEDIUM, QuotaV2HandlerShutdownTimeoutMsDoc) + /** ********* Socket Server Configuration ***********/ .define(ListenersProp, STRING, Defaults.Listeners, HIGH, ListenersDoc) .define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc) @@ -1855,6 +1870,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val ObserverClassName: String = getString(KafkaConfig.ObserverClassNameProp) val ObserverShutdownTimeoutMs: Long = getLong(KafkaConfig.ObserverShutdownTimeoutMsProp) + /** *********** Broker-side QuotaV2Handler Configuration ******* */ + val QuotaV2HandlerClassName: String = getString(KafkaConfig.QuotaV2HandlerClassNameProp) + val QuotaV2HandlerShutdownTimeoutMs: Long = getLong(KafkaConfig.QuotaV2HandlerShutdownTimeoutMsProp) + /** ********* Socket Server Configuration ***********/ val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp) val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f75773ae93bf6..bdb7cb2e971cb 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -134,6 +134,7 @@ class KafkaServer( var authorizer: Option[Authorizer] = None var observer: Observer = null + var quotaV2Handler: QuotaV2Handler = null var socketServer: SocketServer = null var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null @@ -346,6 +347,8 @@ class KafkaServer( observer = Observer(config) + quotaV2Handler = QuotaV2Handler(config) + // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. @@ -470,7 +473,7 @@ class KafkaServer( /* start processing requests */ val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator, - autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, observer, quotaManagers, + autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, observer, quotaV2Handler, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, @@ -478,7 +481,7 @@ class KafkaServer( socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator, - autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, observer, quotaManagers, + autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, observer, quotaV2Handler, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, @@ -839,6 +842,9 @@ class KafkaServer( CoreUtils.swallow(observer.close(config.ObserverShutdownTimeoutMs, TimeUnit.MILLISECONDS), this) + // TODO ngarvey to config + CoreUtils.swallow(quotaV2Handler.close(1000, TimeUnit.MILLISECONDS), this) + if (adminManager != null) CoreUtils.swallow(adminManager.shutdown(), this) diff --git a/core/src/main/scala/kafka/server/NoOpQuotaV2Handler.scala b/core/src/main/scala/kafka/server/NoOpQuotaV2Handler.scala new file mode 100644 index 0000000000000..19a78330922f3 --- /dev/null +++ b/core/src/main/scala/kafka/server/NoOpQuotaV2Handler.scala @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} + +import java.util +import java.util.concurrent.TimeUnit + +/** + * A NoOp handler instantiated by default when no other quota handler is being used. + */ +class NoOpQuotaV2Handler extends QuotaV2Handler { + override def configure(configs: util.Map[String, _]): Unit = () + override def checkLimit(topicPartitions: TopicPartition, request: ProduceRequest): QuotaV2Decision.Value = QuotaV2Decision.APPROVE + override def recordCharge(topicPartitions: TopicPartition, request: ProduceRequest, response: ProduceResponse.PartitionResponse): Unit = () + override def close(timeout: Long, unit: TimeUnit): Unit = () +} diff --git a/core/src/main/scala/kafka/server/QuotaV2Handler.scala b/core/src/main/scala/kafka/server/QuotaV2Handler.scala new file mode 100644 index 0000000000000..6c4ea0bcc659d --- /dev/null +++ b/core/src/main/scala/kafka/server/QuotaV2Handler.scala @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.TimeUnit +import kafka.utils.{CoreUtils, Logging} +import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} +import org.apache.kafka.common.{Configurable, TopicPartition} + +object QuotaV2Decision extends Enumeration { + val APPROVE, DENY = Value +} + +/** + * A class implementing QuotaV2Handler is used to APPROVE or DENY produce requests. The intention is to + * limit the amount of data handled and stored, to avoid excessive usage of the Kafka cluster + * either for availability or capacity reasons. + */ +trait QuotaV2Handler extends Configurable { + + /** + * Concrete classes implementing checkLimit should return a APPROVE or DENY decision, but should + * not charge a request against the quota until recordCharge is called. + */ + def checkLimit(topicPartition: TopicPartition, request: ProduceRequest): QuotaV2Decision.Value + + /** + * recordCharge is used to update the quota backend with the quota consumed by the request. + * recordCharge should be called after an APPROVE decision is made. + */ + def recordCharge(topicPartition: TopicPartition, request: ProduceRequest, response: ProduceResponse.PartitionResponse): Unit + + /** + * Close the QuotaV2Handler with the given timeout. + * + * @param timeout the maximum time to wait to close the handler. + * @param unit the time unit. + */ + def close(timeout: Long, unit: TimeUnit): Unit +} + +object QuotaV2Handler extends Logging { + /** + * Create a new QuotaV2Handler from the given Kafka config. + * @param config the Kafka configuration defining the QuotaV2Handler properties. + * @return A configured instance of QuotaV2Handler. + */ + def apply(config: KafkaConfig): QuotaV2Handler = { + val quotaV2Handler = try { + CoreUtils.createObject[QuotaV2Handler](config.QuotaV2HandlerClassName) + } catch { + case e: Exception => + error(s"Creating QuotaV2Handler instance from the given class name ${config.QuotaV2HandlerClassName} failed.", e) + new NoOpQuotaV2Handler + } + quotaV2Handler.configure(config.originals()) + quotaV2Handler + } +} + + diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index be929bb9d8fbe..554bf0d91f37b 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -103,6 +103,7 @@ class KafkaApisTest { private val brokerId = 1 private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId) private val observer: Observer = EasyMock.createNiceMock(classOf[Observer]) + private val quotaV2Handler: QuotaV2Handler = EasyMock.createNiceMock(classOf[QuotaV2Handler]) private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager]) private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager]) private val clientControllerQuotaManager: ControllerMutationQuotaManager = EasyMock.createNiceMock(classOf[ControllerMutationQuotaManager]) @@ -184,6 +185,7 @@ class KafkaApisTest { metrics, authorizer, observer, + quotaV2Handler, quotas, fetchManager, brokerTopicStats, @@ -1549,7 +1551,9 @@ class KafkaApisTest { for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) { - EasyMock.reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) + EasyMock.reset( + replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, quotaV2Handler + ) val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() @@ -1584,7 +1588,9 @@ class KafkaApisTest { EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0) - EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) + EasyMock.replay( + replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, quotaV2Handler + ) createKafkaApis().handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) @@ -3281,11 +3287,16 @@ class KafkaApisTest { EasyMock.expect(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(EasyMock.anyObject[RequestChannel.Request](), EasyMock.anyObject[Long])).andReturn(0) + EasyMock.expect(clientRequestQuotaManager.throttle( EasyMock.eq(request), EasyMock.anyObject[ThrottleCallback](), EasyMock.eq(0))) + EasyMock.expect(quotaV2Handler.checkLimit( + EasyMock.anyObject(), EasyMock.anyObject() + )).andReturn(QuotaV2Decision.APPROVE) + val capturedResponse = EasyMock.newCapture[AbstractResponse]() EasyMock.expect(requestChannel.sendResponse( EasyMock.eq(request), @@ -3995,4 +4006,74 @@ class KafkaApisTest { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures) } + + def setupQuotaV2Test(): (RequestChannel.Request, Capture[AbstractResponse]) = { + val topic = "topic" + addTopicToMetadataCache(topic, numPartitions = 2) + val tp = new TopicPartition("topic", 0) + val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + + val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection( + Collections.singletonList(new ProduceRequestData.TopicProduceData() + .setName(tp.topic).setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(tp.partition) + .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes)))))) + .iterator)) + .setAcks(1.toShort) + .setTimeoutMs(5000)) + .build(ApiKeys.PRODUCE.latestVersion) + + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), + EasyMock.anyShort(), + EasyMock.eq(false), + EasyMock.eq(AppendOrigin.Client), + EasyMock.anyObject(), + EasyMock.capture(responseCallback), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject()) + ).andAnswer(() => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NONE)))) + + val request = buildRequest(produceRequest) + val capturedResponse = expectNoThrottling(request) + EasyMock.reset(quotaV2Handler) + + EasyMock.expect(requestChannel.sendResponse( + EasyMock.eq(request), + EasyMock.capture(capturedResponse), + EasyMock.eq(None) + )) + + (request, capturedResponse) + } + + @Test + def testQuotaV2IsCalled(): Unit = { + val (request, capturedResponse) = setupQuotaV2Test() + + EasyMock.expect(quotaV2Handler.checkLimit(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(QuotaV2Decision.APPROVE) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, clientQuotaManager, quotaV2Handler, requestChannel) + createKafkaApis().handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) + + EasyMock.verify(quotaV2Handler) + assertEquals(1, capturedResponse.getValue.errorCounts().get(Errors.NONE)) + } + + @Test + def testQuotaV2DenyGivesError(): Unit = { + val (request, capturedResponse) = setupQuotaV2Test() + + EasyMock.expect(quotaV2Handler.checkLimit(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(QuotaV2Decision.DENY) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, clientQuotaManager, quotaV2Handler, requestChannel) + createKafkaApis().handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) + + EasyMock.verify(quotaV2Handler) + assertEquals(1, capturedResponse.getValue.errorCounts().get(Errors.THROTTLING_QUOTA_EXCEEDED)) + assertEquals(0, capturedResponse.getValue.errorCounts().getOrDefault(Errors.NONE, 0)) + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 85bc474c4701b..aabd36a7c64bb 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -817,6 +817,10 @@ class KafkaConfigTest { case KafkaConfig.ObserverClassNameProp => // ignore since even if the class name is invalid, a NoOpObserver class is used instead case KafkaConfig.ObserverShutdownTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") + // QuotaV2Configs + case KafkaConfig.QuotaV2HandlerClassNameProp => // NoOpQuotaV2Handler is used if the classname is invalid + case KafkaConfig.QuotaV2HandlerShutdownTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") + // Raft Quorum Configs case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") diff --git a/core/src/test/scala/unit/kafka/server/QuotaV2HandlerTest.scala b/core/src/test/scala/unit/kafka/server/QuotaV2HandlerTest.scala new file mode 100755 index 0000000000000..010cf94ac7e23 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/QuotaV2HandlerTest.scala @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.junit.jupiter.api.Test +import kafka.utils.TestUtils +import org.junit.jupiter.api.Assertions._ + + +class QuotaV2HandlerTest { + @Test + def testConstruct(): Unit = { + val properties = TestUtils.createBrokerConfig(1, "") + val config = KafkaConfig.apply(properties) + val handler = QuotaV2Handler.apply(config) + assertTrue(handler.isInstanceOf[NoOpQuotaV2Handler]) + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 9f66521b9c73c..a5149c7b9c0b1 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -33,6 +33,7 @@ import kafka.server.KafkaConfig$; import kafka.server.MetadataCache; import kafka.server.Observer; +import kafka.server.QuotaV2Handler; import kafka.server.ZkMetadataCache; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; @@ -100,6 +101,7 @@ public class MetadataRequestBenchmark { private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class); private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); private Observer observer = Mockito.mock(Observer.class); + private QuotaV2Handler quotaV2Handler = Mockito.mock(QuotaV2Handler.class); private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class); private ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class); private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class); @@ -188,6 +190,7 @@ private KafkaApis createKafkaApis() { metrics, Option.empty(), observer, + quotaV2Handler, quotaManagers, fetchManager, brokerTopicStats,