Skip to content

Commit

Permalink
Add QuotaV2Handler into produce path
Browse files Browse the repository at this point in the history
  • Loading branch information
nickgarvey committed Jan 5, 2024
1 parent 4253d39 commit 700b20e
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 6 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class BrokerServer(

var authorizer: Option[Authorizer] = None
var observer: Observer = null
var quotaV2Handler: QuotaV2Handler = null
var socketServer: SocketServer = null
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null

Expand Down Expand Up @@ -237,6 +238,7 @@ class BrokerServer(
)

observer = Observer(config)
quotaV2Handler = QuotaV2Handler(config)

// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
Expand Down Expand Up @@ -389,7 +391,7 @@ class BrokerServer(
val raftSupport = RaftSupport(forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, observer, quotaManagers,
config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, observer, quotaV2Handler, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)

dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val observer: Observer,
val quotaV2Handler: QuotaV2Handler,
val quotas: QuotaManagers,
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,
Expand Down Expand Up @@ -612,6 +613,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
val throttledTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
// cache the result to avoid redundant authorization calls
val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
Expand All @@ -629,6 +631,8 @@ class KafkaApis(val requestChannel: RequestChannel,
unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else if (quotaV2Handler.checkLimit(topicPartition, produceRequest) == QuotaV2Decision.DENY)
throttledTopicResponses += topicPartition -> new PartitionResponse(Errors.THROTTLING_QUOTA_EXCEEDED)
else
try {
ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
Expand All @@ -645,7 +649,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// https://issues.apache.org/jira/browse/KAFKA-10730
@nowarn("cat=deprecation")
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++
invalidRequestResponses ++ throttledTopicResponses
var errorInResponse = false
instrumentation.markStage(Stage.BeginResponseCallback)
instrumentation.appliedTopicPartitions = responseStatus.keys // logging relies on the info
Expand All @@ -658,6 +663,9 @@ class KafkaApis(val requestChannel: RequestChannel,
request.header.clientId,
topicPartition,
status.error.exceptionName))
} else {
// Record the charge for the QuotaV2Handler
quotaV2Handler.recordCharge(topicPartition, produceRequest, status)
}
}

Expand Down Expand Up @@ -687,6 +695,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.throttle(quotas.produce, request, effectiveBandWidthThrottleTime)
requestHelper.throttle(quotas.request, request, effectiveRequestThrottleTime)


// Send the response immediately. In case of throttling, the channel has already been muted.
if (produceRequest.acks == 0) {
// We intentionally don't instrument acks=0 requests,
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ object Defaults {
/** ********* Broker-side configuration ***********/
val ObserverClassName = "kafka.server.NoOpObserver"
val ObserverShutdownTimeoutMs = 2000
val QuotaV2HandlerClassName = "kafka.server.NoOpQuotaV2Handler"
val QuotaV2HandlerShutdownTimeoutMs = 2000

/** ********* Socket Server Configuration ***********/
val Listeners = "PLAINTEXT://:9092"
Expand Down Expand Up @@ -479,6 +481,10 @@ object KafkaConfig {
val ObserverClassNameProp = "observer.class.name"
val ObserverShutdownTimeoutMsProp = "observer.shutdown.timeout"

/** ********* Broker-side quotav2handler Configuration *************** */
val QuotaV2HandlerClassNameProp = "quotav2handler.class.name"
val QuotaV2HandlerShutdownTimeoutMsProp = "quotav2handler.shutdown.timeout"

/** ********* Socket Server Configuration ***********/
val ListenersProp = "listeners"
val AdvertisedListenersProp = "advertised.listeners"
Expand Down Expand Up @@ -1189,6 +1195,11 @@ object KafkaConfig {
"zero. When closing/shutting down an observer, most time is spent on flushing the observed stats. The reasonable timeout should be close to " +
"the time it takes to flush the stats."

/** ********* Broker-side Observer Configuration ******** */
val QuotaV2HandlerClassNameDoc = "The name of the QuotaV2Handler class that is used to approve/deny produce requests."
val QuotaV2HandlerShutdownTimeoutMsDoc = "The maximum time of closing/shutting down a QuotaV2Handler. " +
"This property can not be less than or equal to zero."

private[server] val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
Expand Down Expand Up @@ -1288,6 +1299,10 @@ object KafkaConfig {
.define(ObserverClassNameProp, STRING, Defaults.ObserverClassName, MEDIUM, ObserverClassNameDoc)
.define(ObserverShutdownTimeoutMsProp, LONG, Defaults.ObserverShutdownTimeoutMs, atLeast(1), MEDIUM, ObserverShutdownTimeoutMsDoc)

/************* Broker-side Observer Configuration ***********/
.define(QuotaV2HandlerClassNameProp, STRING, Defaults.QuotaV2HandlerClassName, MEDIUM, QuotaV2HandlerClassNameDoc)
.define(QuotaV2HandlerShutdownTimeoutMsProp, LONG, Defaults.QuotaV2HandlerShutdownTimeoutMs, atLeast(1), MEDIUM, QuotaV2HandlerShutdownTimeoutMsDoc)

/** ********* Socket Server Configuration ***********/
.define(ListenersProp, STRING, Defaults.Listeners, HIGH, ListenersDoc)
.define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc)
Expand Down Expand Up @@ -1855,6 +1870,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val ObserverClassName: String = getString(KafkaConfig.ObserverClassNameProp)
val ObserverShutdownTimeoutMs: Long = getLong(KafkaConfig.ObserverShutdownTimeoutMsProp)

/** *********** Broker-side QuotaV2Handler Configuration ******* */
val QuotaV2HandlerClassName: String = getString(KafkaConfig.QuotaV2HandlerClassNameProp)
val QuotaV2HandlerShutdownTimeoutMs: Long = getLong(KafkaConfig.QuotaV2HandlerShutdownTimeoutMsProp)

/** ********* Socket Server Configuration ***********/
val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp)
val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp)
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class KafkaServer(

var authorizer: Option[Authorizer] = None
var observer: Observer = null
var quotaV2Handler: QuotaV2Handler = null
var socketServer: SocketServer = null
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
Expand Down Expand Up @@ -346,6 +347,8 @@ class KafkaServer(

observer = Observer(config)

quotaV2Handler = QuotaV2Handler(config)

// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
Expand Down Expand Up @@ -470,15 +473,15 @@ class KafkaServer(
/* start processing requests */
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, observer, quotaManagers,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, observer, quotaV2Handler, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)

dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, observer, quotaManagers,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, observer, quotaV2Handler, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)

controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
Expand Down Expand Up @@ -839,6 +842,9 @@ class KafkaServer(

CoreUtils.swallow(observer.close(config.ObserverShutdownTimeoutMs, TimeUnit.MILLISECONDS), this)

// TODO ngarvey to config
CoreUtils.swallow(quotaV2Handler.close(1000, TimeUnit.MILLISECONDS), this)

if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)

Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/kafka/server/NoOpQuotaV2Handler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server

import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}

import java.util
import java.util.concurrent.TimeUnit

/**
* A NoOp handler instantiated by default when no other quota handler is being used.
*/
class NoOpQuotaV2Handler extends QuotaV2Handler {
override def configure(configs: util.Map[String, _]): Unit = ()
override def checkLimit(topicPartitions: TopicPartition, request: ProduceRequest): QuotaV2Decision.Value = QuotaV2Decision.APPROVE
override def recordCharge(topicPartitions: TopicPartition, request: ProduceRequest, response: ProduceResponse.PartitionResponse): Unit = ()
override def close(timeout: Long, unit: TimeUnit): Unit = ()
}
76 changes: 76 additions & 0 deletions core/src/main/scala/kafka/server/QuotaV2Handler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import java.util.concurrent.TimeUnit
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.apache.kafka.common.{Configurable, TopicPartition}

object QuotaV2Decision extends Enumeration {
val APPROVE, DENY = Value
}

/**
* A class implementing QuotaV2Handler is used to APPROVE or DENY produce requests. The intention is to
* limit the amount of data handled and stored, to avoid excessive usage of the Kafka cluster
* either for availability or capacity reasons.
*/
trait QuotaV2Handler extends Configurable {

/**
* Concrete classes implementing checkLimit should return a APPROVE or DENY decision, but should
* not charge a request against the quota until recordCharge is called.
*/
def checkLimit(topicPartition: TopicPartition, request: ProduceRequest): QuotaV2Decision.Value

/**
* recordCharge is used to update the quota backend with the quota consumed by the request.
* recordCharge should be called after an APPROVE decision is made.
*/
def recordCharge(topicPartition: TopicPartition, request: ProduceRequest, response: ProduceResponse.PartitionResponse): Unit

/**
* Close the QuotaV2Handler with the given timeout.
*
* @param timeout the maximum time to wait to close the handler.
* @param unit the time unit.
*/
def close(timeout: Long, unit: TimeUnit): Unit
}

object QuotaV2Handler extends Logging {
/**
* Create a new QuotaV2Handler from the given Kafka config.
* @param config the Kafka configuration defining the QuotaV2Handler properties.
* @return A configured instance of QuotaV2Handler.
*/
def apply(config: KafkaConfig): QuotaV2Handler = {
val quotaV2Handler = try {
CoreUtils.createObject[QuotaV2Handler](config.QuotaV2HandlerClassName)
} catch {
case e: Exception =>
error(s"Creating QuotaV2Handler instance from the given class name ${config.QuotaV2HandlerClassName} failed.", e)
new NoOpQuotaV2Handler
}
quotaV2Handler.configure(config.originals())
quotaV2Handler
}
}


Loading

0 comments on commit 700b20e

Please sign in to comment.