Description
controller选举
zookeeper模式的kafka controller是基于抢占式的选举,即谁先在zk注册成功就是谁
看下raft模式是怎样的
1,启动
//kafka.Kafka.scala
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val server = buildServer(serverProps)
try server.startup()//KafkaRaftServer
KafkaRaftServer
override def startup(): Unit = {
Mx4jLoader.maybeLoad()
raftManager.startup()
controller.foreach(_.startup())
broker.foreach(_.startup())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info(KafkaBroker.STARTED_MESSAGE)
}
1,启动raftManager
2.启动controller,这里是ControllerServer,内部会包装一个QuorumController,这个才是raft模式的controller
3,启动broker
QuorumController继承自Controller,扫了一眼Controller的代码,发现个有意思的接口和注释,
/**
* If this controller is active, this is the non-negative controller epoch.
* Otherwise, this is -1.
*/
int curClaimEpoch();
/**
* Returns true if this controller is currently active.
*/
default boolean isActive() {
return curClaimEpoch() != -1;
}
//QuorumController.java
@Override
public int curClaimEpoch() {
return curClaimEpoch;
}
controller的值不为-1,这里直接看curClaimEpoch的定义以及在哪里修改的就好了
除了初始化外,唯一更新这个epoch的地方位于QuorumController.QuorumMetaLogListener的handleLeaderChange方法内
public void handleLeaderChange(LeaderAndEpoch newLeader) {
if (newLeader.isLeader(nodeId)) {
final int newEpoch = newLeader.epoch();
appendRaftEvent("handleLeaderChange[" + newEpoch + "]", () -> {
int curEpoch = curClaimEpoch;
if (curEpoch != -1) {
throw new RuntimeException("Tried to claim controller epoch " +
newEpoch + ", but we never renounced controller epoch " +
curEpoch);
}
log.info(
"Becoming the active controller at epoch {}, committed offset {} and committed epoch {}.",
newEpoch, lastCommittedOffset, lastCommittedEpoch
);
curClaimEpoch = newEpoch;//这个是唯一赋值的地方,更新epoch
controllerMetrics.setActive(true);
writeOffset = lastCommittedOffset;
clusterControl.activate();
// Before switching to active, create an in-memory snapshot at the last committed offset. This is
// required because the active controller assumes that there is always an in-memory snapshot at the
// last committed offset.
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
});
} else if (curClaimEpoch != -1) {
appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
"log event. Reverting to last committed offset {}.", curClaimEpoch,
lastCommittedOffset);
renounce();
});
}
}
KafkaRaftClient
private def buildRaftClient(): KafkaRaftClient[T] = {
val expirationTimer = new SystemTimer("raft-expiration-executor")
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
val nodeId = if (config.processRoles.contains(ControllerRole)) {
OptionalInt.of(config.nodeId)
} else {
OptionalInt.empty()
}
val client = new KafkaRaftClient(
recordSerde,
netChannel,
replicatedLog,
quorumStateStore,
time,
metrics,
expirationService,
logContext,
metaProperties.clusterId,
nodeId,
raftConfig
)
client.initialize()
client
}
看到onBecomeCandidate,onBecomeFollower就感觉很熟悉了,自然也有onBecomeLeader才对,Kafka有个初始状态
@Override
public void initialize() {
quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()));
long currentTimeMs = time.milliseconds();
if (quorum.isLeader()) {
throw new IllegalStateException("Voter cannot initialize as a Leader");
} else if (quorum.isCandidate()) {
onBecomeCandidate(currentTimeMs);
} else if (quorum.isFollower()) {
onBecomeFollower(currentTimeMs);
}
// When there is only a single voter, become candidate immediately
if (quorum.isVoter()
&& quorum.remoteVoters().isEmpty()
&& !quorum.isCandidate()) {
transitionToCandidate(currentTimeMs);
}
}
既然是raft,必然会启动选举loop,从candidate->leader,上边不是还有个raftManager么,RaftIoThread,
def doWork(): Unit
override def run(): Unit = {
isStarted = true
info("Starting")
try {
while (isRunning)
doWork()
} catch {
case e: FatalExitError =>
shutdownInitiated.countDown()
shutdownComplete.countDown()
info("Stopped")
Exit.exit(e.statusCode())
case e: Throwable =>
if (isRunning)
error("Error due to", e)
} finally {
shutdownComplete.countDown()
}
info("Stopped")
}
override def doWork(): Unit = {
client.poll()
}
发送voteRequest KafkaRaftClient client()-->pollCurrentState->pollCandidate()-->maybeSendVoteRequests()
private long pollCurrentState(long currentTimeMs) {
if (quorum.isLeader()) {
return pollLeader(currentTimeMs);
} else if (quorum.isCandidate()) {
return pollCandidate(currentTimeMs);
} else if (quorum.isFollower()) {
return pollFollower(currentTimeMs);
} else if (quorum.isVoted()) {
return pollVoted(currentTimeMs);
} else if (quorum.isUnattached()) {
return pollUnattached(currentTimeMs);
} else if (quorum.isResigned()) {
return pollResigned(currentTimeMs);
} else {
throw new IllegalStateException("Unexpected quorum state " + quorum);
}
}
初始状态应该是Unattached
候选,竞争leader选举 pollCandidate
private long pollCandidate(long currentTimeMs) {
CandidateState state = quorum.candidateStateOrThrow();
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
// If we happen to shutdown while we are a candidate, we will continue
// with the current election until one of the following conditions is met:
// 1) we are elected as leader (which allows us to resign)
// 2) another leader is elected
// 3) the shutdown timer expires
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
} else if (state.isBackingOff()) {
if (state.isBackoffComplete(currentTimeMs)) {
logger.info("Re-elect as candidate after election backoff has completed");
transitionToCandidate(currentTimeMs);
return 0L;
}
return state.remainingBackoffMs(currentTimeMs);
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries());
logger.debug("Election has timed out, backing off for {}ms before becoming a candidate again",
backoffDurationMs);
state.startBackingOff(currentTimeMs, backoffDurationMs);
return backoffDurationMs;
} else {
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
}
}
Implement Raft Protocol for Metadata Quorum
KAFKA-10492; Core Kafka Raft Implementation (KIP-595)
kafka-raft