Skip to content

Kafka3.x 请求处理Apis #13

Open
@2pc

Description

@2pc

ApiRequestHandler

ApiRequestHandler一个接口,两种不同的实现

  1. zk模式KafkaApis
  2. raft模式ControllerApis
trait ApiRequestHandler {
  def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit
}

controllerApis

初始化apis,raft版本是controllerApis

//kafka.server.ControllerServer.scala
  controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
    authorizer,
    quotaManagers,
    time,
    supportedFeatures,
    controller,
    raftManager,
    config,
    metaProperties,
    controllerNodes.toSeq,
    apiVersionManager)
  controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
    socketServer.dataPlaneRequestChannel,
    controllerApis,
    time,
    config.numIoThreads,
    s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
    SocketServer.DataPlaneThreadPrefix)

KafkaRequestHandlerPool

KafkaRequestHandlerPool内初始化KafkaRequestHandler基本差不多

//KafkaRequestHandlerPool
this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
  createHandler(i)
}

def createHandler(id: Int): Unit = synchronized {
  runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
  KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}

KafkaRequestHandler

KafkaRequestHandler 的run方法

 val req = requestChannel.receiveRequest(300)
apis.handle(request, requestLocal) //apis处理方法

ControllerApis 的实现

override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
  try {
    request.header.apiKey match {
      case ApiKeys.FETCH => handleFetch(request)
      case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
      case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
      case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      case ApiKeys.ALTER_CONFIGS => handleLegacyAlterConfigs(request)
      case ApiKeys.VOTE => handleVote(request)
      case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
      case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
      case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request)
      case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
      case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request)
      case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request)
      case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
      case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
      case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
      case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
      case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
      case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
      case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
      case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
      case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
      case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
      case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
      case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
      case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
    }
  } catch {
    case e: FatalExitError => throw e
    case e: ExecutionException => requestHelper.handleError(request, e.getCause)
    case e: Throwable => requestHelper.handleError(request, e)
  }
}

receiveRequest只是从requestQueue获取请求

def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
  requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

Processor 处理selector事件

哪里放入requestQueue呢,必然是是socket 通过selector获取,按照之前版本的处理,看下Processor就好了

//Processor.run()
  while (isRunning) {
    try {
      // setup any new connections that have been queued up
      configureNewConnections()
      // register any new responses for writing
      processNewResponses()
      poll()//pollSelectionKeys处理selector事件,将请求放入completedReceives,completedSends
      processCompletedReceives()//获取completedReceives并发送给requestQueue
      processCompletedSends()//获取completedSends调用onComplete方法处理请求回调等
      processDisconnected()
      closeExcessConnections()
    } catch {
      // We catch all the throwables here to prevent the processor thread from exiting. We do this because
      // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
      // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
      // be either associated with a specific socket channel or a bad request. These exceptions are caught and
      // processed by the individual methods above which close the failing channel and continue processing other
      // channels. So this catch block should only ever see ControlThrowables.
      case e: Throwable => processException("Processor got uncaught exception.", e)
    }
  }

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions