Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka3.x 元数据MetadataCache #12

Open
2pc opened this issue Feb 28, 2022 · 0 comments
Open

Kafka3.x 元数据MetadataCache #12

2pc opened this issue Feb 28, 2022 · 0 comments
Labels

Comments

@2pc
Copy link
Owner

2pc commented Feb 28, 2022

MetadataCache两种实现

由于支持raft模式和zk模式,所以有两种实现KRaftMetadataCache,ZkMetadataCache

注意这两内部的对象比较特殊
1.ZkMetadataCache的内部叫metadataSnapshot
2.KRaftMetadataCache内部叫_currentImage

ZkMetadataCache更新流程

其更新流程zk模式很明显在KafkaApis内部显示处理

//KafkaApis.handleUpdateMetadataRequest-->ReplicaManager.maybeUpdateMetadataCache-->zkMetadataCache.updateMetadata
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)

老版本zkMetadataCache.updateMetadata调用完直接返回deletedPartitions

KRaftMetadataCache更新

raft模式更新,BrokerServer启动的时候会向raftManager注册一个BrokerMetadataListener

metadataListener = new BrokerMetadataListener(config.nodeId,
                                              time,
                                              threadNamePrefix,
                                              config.metadataSnapshotMaxNewRecordBytes,
                                              metadataSnapshotter)
// Register a listener with the Raft layer to receive metadata event notifications
raftManager.register(metadataListener)

而内部是注册到KafkaRaftClient

//KafkaRaftClient.scala
public void register(Listener<T> listener) {
    pendingRegistrations.add(Registration.register(listener));
    wakeup();
}

poll loop调用逻辑
KafkaRaftManager.run()-->KafkaRaftManager.doWork()-->KafkaRaftClient.poll

    /**
     * Poll for new events. This allows the client to handle inbound
     * requests and send any needed outbound requests.
     */
    public void poll() {
        pollListeners();//BrokerMetadataListener被调用

        long currentTimeMs = time.milliseconds();
        if (maybeCompleteShutdown(currentTimeMs)) {
            return;
        }

        long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
        long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
        long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);

        kafkaRaftMetrics.updatePollStart(currentTimeMs);

        RaftMessage message = messageQueue.poll(pollTimeoutMs);//处理raft请求,ControllerApis依据不同ApiKeys做不同的处理

        currentTimeMs = time.milliseconds();
        kafkaRaftMetrics.updatePollEnd(currentTimeMs);

        if (message != null) {
            handleInboundMessage(message, currentTimeMs);//req处理
        }
    }

BrokerMetadataListener

BrokerMetadataListener继承自RaftClient.Listener接口,主要两个方法

void handleCommit(BatchReader<T> reader);
void handleSnapshot(SnapshotReader<T> reader);

这两方法触发都会更新一遍KRaftMetadataCache内部叫_currentImage,通过一个MetadataPublisher发布事件

  /**
   * Handle new metadata records.
   */
  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
    eventQueue.append(new HandleCommitsEvent(reader))

  class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
      extends EventQueue.FailureLoggingEvent(log) {
    override def run(): Unit = {
      val results = try {
        val loadResults = loadBatches(_delta, reader, None, None, None)
        if (isDebugEnabled) {
          debug(s"Loaded new commits: ${loadResults}")
        }
        loadResults
      } finally {
        reader.close()
      }
      _publisher.foreach(publish)

      snapshotter.foreach { snapshotter =>
        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
        if (shouldSnapshot()) {
          if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
            _bytesSinceLastSnapshot = 0L
          }
        }
      }
    }
  }

直接看publish方法

  private def publish(publisher: MetadataPublisher): Unit = {
    val delta = _delta
    _image = _delta.apply()
    _delta = new MetadataDelta(_image)
    publisher.publish(delta, _image)
  }

RaftMetadataCace更新

给BrokerMetadataPublisher更新metadataCache的Image(也就是更新_currentImage)

override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
    metadataCache.setImage(newImage)
    }
@2pc 2pc added the kafka label Feb 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant