![点击阅读原文查看高清大图](https://img-blog.csdnimg.cn/6f57565da4ff4dceb219f6a416aa5900.png#pic_center)
🔥《Kafka运维管控平台LogiKM》🔥
✏️更强大的管控能力✏️
🎾更高效的问题定位能力🎾
🌅更便捷的集群运维能力🌅
🎼更专业的资源治理🎼
🌞更友好的运维生态🌞
[TOC]
目录
![点击阅读原文查看高清大图](https://img-blog.csdnimg.cn/6f57565da4ff4dceb219f6a416aa5900.png#pic_center)
思考几个问题
- 什么是分区状态机?
- 创建Topic的时候如何选举Leader?
- 分区的所有副本都不在线, 这个时候启动一台之前不在ISR内的副本,它会当选为Leader吗?
- 当所有副本都不在线,然后一个一个重启Broker上副本上线,谁会当选为Leader?谁先启动就谁当选吗?
- Broker下线了,Leader切换给了其他副本, 当Broker重启的时候,Leader会还给之前的副本吗?
- 选举成功的那一刻, 生产者和消费着都做了哪些事情?
- Leader选举期间对分区的影响
- Broker宕机了,会立即触发所有副本脱离ISR吗?
- Broker宕机了,为什么会又触发受控关机Leader选举策略、又会触发离线分区Leader选举策略?
分区Leader选举流程分析
在开始源码分析之前, 大家先看下面这张图, 好让自己对Leader选举有一个非常清晰的认知,然后再去看后面的源码分析文章,会更容易理解。
![点击阅读原文查看高清大图](https://img-blog.csdnimg.cn/01218fc276a24060bfcc89f125329fbe.png#pic_center)
整个流程分为三大块
- 触发选举场景 图左
- 执行选举流程 图中
- Leader选举策略 图右
分区状态机
首先大家得了解两个状态机
1. 分区状态机 控制分区状态流转
2. 副本状态机 控制副本状态流转
这里我们主要讲解分区状态机,这张图表示的是分区状态机
![分区状态机 (点击阅读原文查看高清大图)](https://img-blog.csdnimg.cn/7b93a3b3bbd6431d8a7ffa0c26e89233.png#pic_center)
NonExistentPartition :分区在将要被创建之前的初始状态是这个,表示不存在
NewPartition: 表示正在创建新的分区, 是一个中间状态, 这个时候只是在Controller的内存中存了状态信息
OnlinePartition: 在线状态, 正常的分区就应该是这种状态,只有在线的分区才能够提供服务
OfflinePartition: 下线状态, 分区可能因为Broker宕机或者删除Topic等原因流转到这个状态, 下线了就不能提供服务了
NonExistentPartition: 分区不存在的状态, 当Topic删除完成成功之后, 就会流转到这个状态, 当还处在删除中的时候,还是停留在下线状态。
我们今天要讲的Leader选举
就是在之前状态=>OnlinePartition状态的时候发生的。
Leader选举流程分析
源码入口:
PartitionStateMachine#electLeaderForPartitions
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
private def doHandleStateChanges( partitions: Seq[TopicPartition], targetState: PartitionState, partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = { targetState match { case OnlinePartition => if (partitionsToElectLeader.nonEmpty) { val electionResults = electLeaderForPartitions( partitionsToElectLeader, partitionLeaderElectionStrategyOpt.getOrElse( throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition") ) )
} }
|
可以看到 我们最终是调用了doElectLeaderForPartitions 执行分区Leader选举。
PartitionStateMachine#doElectLeaderForPartitions
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| private def doElectLeaderForPartitions( partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = { val getDataResponses = try { zkClient.getTopicPartitionStatesRaw(partitions) } val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]] val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)] getDataResponses.foreach { getDataResponse => val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition] val currState = partitionState(partition) if (getDataResponse.resultCode == Code.OK) { TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match { case Some(leaderIsrAndControllerEpoch) => if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) { } else { validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr } case None => } } else if (getDataResponse.resultCode == Code.NONODE) { } else { } }
if (validLeaderAndIsrs.isEmpty) { return (failedElections.toMap, Seq.empty) }
val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match { case OfflinePartitionLeaderElectionStrategy(allowUnclean) => val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState( validLeaderAndIsrs, allowUnclean ) leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty) case ReassignPartitionLeaderElectionStrategy => leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) case PreferredReplicaPartitionLeaderElectionStrategy => leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) case ControlledShutdownPartitionLeaderElectionStrategy => leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) }
partitionsWithoutLeaders.foreach { electionResult => val partition = electionResult.topicPartition val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy" failedElections.put(partition, Left(new StateChangeFailedException(failMsg))) } val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr( adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion) finishedUpdates.foreach { case (partition, result) => result.right.foreach { leaderAndIsr => val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition, leaderIsrAndControllerEpoch, replicaAssignment, isNew = false) } }
(finishedUpdates ++ failedElections, updatesToRetry) }
|
总结一下上面的源码
- 去zookeeper节点
/broker/topics/{topic名称}/partitions/{分区号}/state
节点读取基本信息。
- 遍历从zk中获取的leaderIsrAndControllerEpoch信息,做一些简单的校验:zk中获取的数据的controllerEpoch必须<=当前的Controller的controller_epoch。最终得到 validLeaderAndIsrs, controller_epoch 就是用来防止脑裂的, 当有两个Controller当选的时候,他们的epoch肯定不一样, 那么最新的epoch才是真的Controller
- 如果没有获取到有效的validLeaderAndIsrs 信息 则直接返回
- 根据入参partitionLeaderElectionStrategy 来匹配不同的Leader选举策略。来选出合适的Leader和ISR信息
- 根据上面的选举策略选出的 LeaderAndIsr 信息进行遍历, 将它们一个个写入到zookeeper节点
/broker/topics/{topic名称}/partitions/{分区号}/state
中。 (当然如果上面没有选择出合适的leader,那么久不会有这个过程了)
注意: 当没有选出Leader的时候,那么是不会去更新ZK中的Leader和ISR信息的。
那么你应该会问,这种情况下Leader没有选出,那么就应该更新为-1呀!那么是在什么时候去更新的呢?
- 遍历上面写入zk成功的分区, 然后更新Controller里面的分区leader和isr的内存信息 并发送LeaderAndISR请求,通知对应的Broker Leader更新了。
![Leader选举流程 (点击阅读原文查看高清大图)](https://img-blog.csdnimg.cn/3d4a39fba6a54e78ab4dfddc8665b660.png#pic_center)
注意: 当没有选出Leader的时候,那么是不会去更新ZK中的Leader和ISR信息的。
那么你应该会问,这种情况下Leader没有选出,那么就应该更新为-1呀!那么是在什么时候去更新的呢?
上面我们只是分析了Leader选举的流程,但是在调用的地方是分区状态的流转触发了Leader选举。
分区状态有流转那么就会伴随着副本的状态流转。比如 副本状态流转到 OfflineReplica
副本离线的时候就有可能触发我们去更新 zk中的节点信息
KafkaController#onReplicasBecomeOffline
1 2 3 4 5 6 7 8 9 10
| private def onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit = { partitionStateMachine.triggerOnlinePartitionStateChange() replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
}
|
下面的关机代码就是更新zk节点数据的计算方式
ReplicaStateMachine#doRemoveReplicasFromIsr
1 2 3 4 5 6 7 8 9 10
| val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = leaderAndIsrsWithReplica.flatMap { case (partition, result) => result.toOption.map { leaderAndIsr => val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId) partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr) } }
|
从代码我们可以看出来, 当 :
当前要离线的副本==Leader, 那么这个新Leader == -1;否则新Leader还是等于原Leader
当Isr的数量只剩下1个的时候,那么ISR等于原ISR,否则新ISR=(原ISR-当前的被离线的副本)
这里的逻辑是不是就解释清楚了我们一开始的问题
什么时候Leader会设置为-1
看上面的Leader选举策略是不是很简单, 总结一下就是:
分区状态变更, 分区状态变更的时候会触发Leader选举,选举成功会去zk修改节点brokers/topics/{Topic}/partitions/{分区}/state
值
分区状态变更,一般伴随着副本状态的变更,副本状态的变更也会触发去修改zk节点brokers/topics/{Topic}/partitions/{分区}/state
一般如果Leader选举的时候选举出来了新的LeaderAndISR,那么副本变更的时候修改zk的值基本不变。
但是如果Leader选举的时候没有选举LeaderAndIsr,那么在副本状态变更这一步骤
就会将zk节点的leader设置为-1. 设置的逻辑就是上面提过的:
当前要离线的副本==Leader, 那么这个新Leader == -1;否则新Leader还是等于原Leader
当Isr的数量只剩下1个的时候,那么ISR等于原ISR,否则新ISR=(原ISR-当前的被离线的副本)
上面分析了Leader选举策略流程,但是中间究竟是如何选择Leader的?
这个是根据传入的策略类型, 来做不同的选择
那么有哪些策略呢?以及什么时候触发这些选举呢?
分区的几种策略以及对应的触发场景
1. OfflinePartitionLeaderElectionStrategy
遍历分区的AR, 找到第一个满足以下条件的副本:
- 副本在线
- 在ISR中。
如果找不到满足条件的副本,那么再根据 传入的参数allowUnclean判断
- allowUnclean=true:AR顺序中所有在线副本中的第一个副本。
- allowUnclean=false: 需要去查询配置
unclean.leader.election.enable
的值。
若=true ,则跟上面 1一样 。
若=false,直接返回None,没有找到合适的Leader。
![离线分区Leader选举策略 (点击阅读原文查看高清大图)](https://img-blog.csdnimg.cn/6f339261dc75440daa6fa3d7fdb339e6.png)
源码位置:
Election#leaderForOffline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
case OfflinePartitionLeaderElectionStrategy(allowUnclean) => val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState( validLeaderAndIsrs, allowUnclean ) leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
private def leaderForOffline(partition: TopicPartition, leaderAndIsrOpt: Option[LeaderAndIsr], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): ElectionResult = {
val assignment = controllerContext.partitionReplicaAssignment(partition) val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) leaderAndIsrOpt match { case Some(leaderAndIsr) => val isr = leaderAndIsr.isr val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection( assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) leaderAndIsr.newLeaderAndIsr(leader, newIsr) } ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas)
case None => ElectionResult(partition, None, liveReplicas) } }
object PartitionLeaderElectionAlgorithms { def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse { if (uncleanLeaderElectionEnabled) { val leaderOpt = assignment.find(liveReplicas.contains) if (leaderOpt.isDefined) controllerContext.stats.uncleanLeaderElectionRate.mark() leaderOpt } else { None } } }
```` 1. 先组装所有给定的 **validLeaderAndIsrs** 的信息 其实主要还是要去获取每个Topic的对应的`unclean.leader.election.enable` 属性值。 默认情况下,我们调用到这里的时候 这个入参`allowUnclean=false`. **如果是false** 那我们需要去查询一下指定的topic它的属性`unclean.leader.election.enable` 是什么 **如果是true** 则表示直接覆盖了`unclean.leader.election.enable`的配置为true。 ![在这里插入图片描述](https:
2. 找到 第一个满足条件:**副本在线** && 在 **ISR中的副本**。 3. 如果没有满足条件的 则判断入**uncleanLeaderElectionEnabled**的配置 如果是true,则从不在isr中的存活副本中获取副本作为leader。 当然这个**uncleanLeaderElectionEnabled** 参数是上 步骤1中决定的。
#### 触发场景:Controller 重新加载 > Controller 当选的时候会启动 **分区状态机** `partitionStateMachine`, 启动的时候会重新加载所有分区的状态到内存中, 并触发 对处于 **NewPartition** 或 **OfflinePartition** 状态的所有分区尝试变更为 **OnlinePartition** 状态的状态。把新创建的分区和离线的分区触发一下选举流程啊 > > 触发源码入口:
**KafkaController#onControllerFailover**
`partitionStateMachine.startup()`
```scala
partitionStateMachine.triggerOnlinePartitionStateChange()
|
加szzdzhp001,领取全部kafka知识图谱
触发场景:脚本执行脏选举
当执行 kafka-leader-election.sh
的时候并且 模式选择的是UNCLEAN
. 则会触发这个模式。
这里注意一下,入参allowUnclean
= (electionTrigger == AdminClientTriggered)
意思是: 当触发的场景是AdminClientTriggered的时候, 则allowUnclean=true
,表示 不关心配置参数 unclean.leader.election.enable
是什么, 如果没有找到符合条件的Leader, 则就去非ISR 列表找Leader。
刚好 我能脚本执行的时候 触发器就是 AdminClientTriggered
其他触发器有:
AutoTriggered : 定时自动触发。
ZkTriggered:Controller切换的时候触发的(zk节点/controller 的变更便是Controller角色的切换)
AdminClientTriggered:客户端主动触发。
触发场景:Controller 监听到有Broker启动了
同上。
触发源码入口:
KafkaController#processBrokerChange#onBrokerStartup
1
| partitionStateMachine.triggerOnlinePartitionStateChange()
|
触发场景:Controller 监听 LeaderAndIsrResponseReceived请求
同上。
当Controller向对应的Broker发起 LeaderAndIsrRequest 请求的时候.
有一个回调函数callback, 这个回调函数会向Controller发起一个事件为 LeaderAndIsrResponseReceived 请求。
具体源码在:
ControllerChannelManager#sendLeaderAndIsrRequest
![在这里插入图片描述](https://img-blog.csdnimg.cn/afa59506c2f84ccbacae95773ab34c34.png)
Controller收到这个事件的请求之后,根据返回的 leaderAndIsrResponse 数据
会判断一下有没有新增加的离线副本(一般都是由于磁盘访问有问题)
如果有新的离线副本,则需要将这个离线副本标记为Offline状态
触发场景:Broker宕机,Controller监听到了变更
源码入口:
KafkaController#onReplicasBecomeOffline
1
| partitionStateMachine.triggerOnlinePartitionStateChange()
|
这里会先找那些Leader副本已经离线了或者正在等待关机的Broker中的副本。
然后对这些分区触发状态变更,将他们的分区状态变成离线状态 也就是:OfflinePartition
再调用partitionStateMachine.triggerOnlinePartitionStateChange()
就会对上述分区重新选举了。
或许你会问:
为啥Broker关机的时候会执行两次选举策略?一个是ControlledShutdownPartitionLeaderElectionStrategy,接着就是OfflinePartitionLeaderElectionStrategy。
其实执行两次选举并不冲突! 第一次一个是ControlledShutdownPartitionLeaderElectionStrategy 选举会将大部分的分区成功切换到新的Leader。 但是对于如果刚好副本数量==1的话,是不会执行ControlledShutdownPartitionLeaderElectionStrategy策略选举的。
并且当broker的配置 controlled.shutdown.enable=false
. 那么这里的受控关机选举策略就不会执行了。
接下来就是执行OfflinePartitionLeaderElectionStrategy选举策略, 补充一下上述没有成功切换的分区。
加szzdzhp001,领取全部kafka知识图谱
触发场景:Controller 监听 UncleanLeaderElectionEnable请求
当我们在修改动态配置的时候, 将动态配置:unclean.leader.election.enable
设置为 true 的时候
会触发向Controller发起UncleanLeaderElectionEnable的请求,这个时候则需要触发一下。触发请求同上。
触发源码入口:
KafkaController#processTopicUncleanLeaderElectionEnable
1
| partitionStateMachine.triggerOnlinePartitionStateChange(topic)
|
上面的触发调用的代码就是下面的接口
对处于 NewPartition 或 OfflinePartition 状态的所有分区尝试变更为
OnlinePartition 的状态。 状态的流程触发了Leader选举。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
def triggerOnlinePartitionStateChange(): Unit = { val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition)) triggerOnlineStateChangeForPartitions(partitions) }
private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Unit = { val partitionsToTrigger = partitions.filter { partition => !controllerContext.isTopicQueuedUpForDeletion(partition.topic) }.toSeq
handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false))) }
|
- 获取所有 OfflinePartition 、NewPartition 的分区状态
- 尝试将 所有 NewPartition or OfflinePartition 状态的分区全部转别成 OnlinePartition状态,
但是如果对应的Topic正在删除中,则会被排除掉
- 分区状态机进行状态流转 使用 OfflinePartitionLeaderElectionStrategy 选举策略(
allowUnclean=true
表示如果从isr中没有选出leader,则允许从非isr列表中选举leader ,allowUnclean=false
表示如果从isr中没有选出leader, 则需要去读取配置文件的配置 unclean.leader.election.enable
来决定是否允许从非ISR列表中选举Leader。 )
加szzdzhp001,领取全部kafka知识图谱
2. ReassignPartitionLeaderElectionStrategy
分区副本重分配选举策略:
当执行分区副本重分配的时候, 原来的Leader可能有变更, 则需要触发一下 Leader选举。
- 只有当之前的Leader副本在经过重分配之后不存在了。
例如: [2,0] ==> [1,0] 。 原来2是Leader副本,经过重分配之后变成了 [1,0]。2已经不复存在了,所以需要重新选举Leader。
- 当原来的分区Leader副本 因为某些异常,下线了。需要重新选举Leader
![](https://img-blog.csdnimg.cn/2c28efe2a50c437fa4ec7ec88f1eaed2.png)
分区副本重分配发生的Leader选举.
Election#leaderForReassign
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private def leaderForReassign(partition: TopicPartition, leaderAndIsr: LeaderAndIsr, controllerContext: ControllerContext): ElectionResult = { val targetReplicas = controllerContext.partitionFullReplicaAssignment(partition).targetReplicas val liveReplicas = targetReplicas.filter(replica => controllerContext.isReplicaOnline(replica, partition)) val isr = leaderAndIsr.isr val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(targetReplicas, isr, liveReplicas.toSet) val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader)) ElectionResult(partition, newLeaderAndIsrOpt, targetReplicas) }
def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = { reassignment.find(id => liveReplicas.contains(id) && isr.contains(id)) }
|
总结:
从当前的副本分配列表中,获取副本在线&&副本在ISR中的 第一个副本,遍历的顺序是当前副本的分配方式(AR),跟ISR的顺序没有什么关系。
加szzdzhp001,领取全部kafka知识图谱
触发场景:分区副本重分配
并不是每次执行分区副本重分配都会触发这个Leader选举策略, 下面两种情况才会触发
- 只有当之前的Leader副本在经过重分配之后不存在了。例如: [2,0] ==> [1,0] 。 原来2是Leader副本,经过重分配之后变成了 [1,0]。2已经不复存在了,所以需要重新选举Leader。
- 当原来的分区Leader副本 因为某些异常,下线了。需要重新选举Leader
对应的判断条件代码如下:
KafkaController#moveReassignedPartitionLeaderIfRequired
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition, newAssignment: ReplicaAssignment): Unit = { val reassignedReplicas = newAssignment.replicas val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
if (!reassignedReplicas.contains(currentLeader)) { partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) } else if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) { updateLeaderEpochAndSendRequest(topicPartition, newAssignment) } else { partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) } }
|
![在这里插入图片描述](https://img-blog.csdnimg.cn/8c77ed6734ee490086ab16ca5b25a518.png#pic_center)
点击查看分区重分配的源码解析
3. PreferredReplicaPartitionLeaderElectionStrategy
优先副本选举策略, 必须满足三个条件:
是第一个副本&&副本在线&&副本在ISR列表中。
满足上面三个条件才会当选leader,不满足则不会做变更。
![优先副本选举 (点击阅读原文看高清大图)](https://img-blog.csdnimg.cn/686239f8a7eb47c5a84146e8931a5a69.png)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| def leaderForPreferredReplica(controllerContext: ControllerContext, leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = { leaderAndIsrs.map { case (partition, leaderAndIsr) => leaderForPreferredReplica(partition, leaderAndIsr, controllerContext) } }
private def leaderForPreferredReplica(partition: TopicPartition, leaderAndIsr: LeaderAndIsr, controllerContext: ControllerContext): ElectionResult = { val assignment = controllerContext.partitionReplicaAssignment(partition) val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) val isr = leaderAndIsr.isr val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet) val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader)) ElectionResult(partition, newLeaderAndIsrOpt, assignment) }
def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = { assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id)) }
|
- 从内存中获取TopicPartition的分配方式
- 过滤不在线的副本
- 找到第一个副本判断一下是否在线&&在ISR列表中。如果满足,则选他为leader,如果不满足,也不会再找其他副本了。
- 返回leaderAndIsr信息, 这里的ISR是没有做修改的。
加szzdzhp001,领取全部kafka知识图谱
触发场景:自动定时执行优先副本选举任务
Controller 启动的时候,会启动一个定时任务 。每隔一段时间就去执行 优先副本选举任务。
与之相关配置:
1 2 3 4 5 6 7 8 9 10 11 12
|
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds partition = 300
leader.imbalance.per.broker.percentage = 10
|
触发场景: Controller 重新加载的时候
在这个触发之前还有执行
partitionStateMachine.startup()
相当于是先把 OfflinePartition、NewPartition状态的分区执行了OfflinePartitionLeaderElectionStrategy 策略。
然后又执行了
PreferredReplicaPartitionLeaderElectionStrategy策略
这里是从zk节点 /admin/preferred_replica_election
读取数据, 来进行判断是否有需要执行Leader选举的分区
它是在执行kafka-preferred-replica-election
命令的时候会创建这个zk节点
但是这个已经被标记为废弃了,并且在3.0的时候直接移除了。
源码位置:
KafkaController#onControllerFailover
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered) private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = { val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition => val replicas = controllerContext.partitionReplicaAssignment(partition) val topicDeleted = replicas.isEmpty val successful = if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicas.head else false successful || topicDeleted } val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion pendingPreferredReplicaElections }
|
触发场景:执行优先副本选举脚本的时候
执行脚本 kafka-leader-election.sh
并且选择的模式是 PREFERRED
(优先副本选举)
则会选择 PreferredReplicaPartitionLeaderElectionStrategy 策略选举
4. ControlledShutdownPartitionLeaderElectionStrategy
受控关机选举策略 :
当Broker关机的过程中,会向Controller发起一个请求, 让它重新发起一次选举, 把在所有正在关机(也就是发起请求的那个Broker,或其它同时正在关机的Broker) 的Broker里面的副本给剔除掉。
根据算法算出leader:找到第一个满足条件的副本:
副本在线 && 副本在ISR中 && 副本所在的Broker不在正在关闭的Broker集合中 。
构造新的ISR列表: 在之前的isr列表中将 正在被关闭的Broker里面的副本 给剔除掉
![受控关机Leader选举策略 (点击阅读原文查看高清大图)](https://img-blog.csdnimg.cn/d820d510ea3b4599a56c0d0f6234331e.png)
加szzdzhp001,领取全部kafka知识图谱
Election#leaderForControlledShutdown
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
def leaderForControlledShutdown(controllerContext: ControllerContext, leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = { val shuttingDownBrokerIds = controllerContext.shuttingDownBrokerIds.toSet leaderAndIsrs.map { case (partition, leaderAndIsr) => leaderForControlledShutdown(partition, leaderAndIsr, shuttingDownBrokerIds, controllerContext) } } } private def leaderForControlledShutdown(partition: TopicPartition, leaderAndIsr: LeaderAndIsr, shuttingDownBrokerIds: Set[Int], controllerContext: ControllerContext): ElectionResult = { val assignment = controllerContext.partitionReplicaAssignment(partition) val liveOrShuttingDownReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true)) val isr = leaderAndIsr.isr val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveOrShuttingDownReplicas.toSet, shuttingDownBrokerIds) val newIsr = isr.filter(replica => !shuttingDownBrokerIds.contains(replica)) val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeaderAndIsr(leader, newIsr)) ElectionResult(partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas) } def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id)) }
|
触发场景:Broker关机的时候
当Broker关闭的时候, 会向Controller发一起一个ControlledShutdownRequest
请求, Controller收到这个请求会针对性的做一些善后事件。比如说 执行Leader重选举 等等之类的。
这里的选举会将副本数量==1的分区给过滤掉,不会对它进行重选举
源码位置:KafkaServer#controlledShutdown
Controller收到请求的源码位置:KafkaController#doControlledShutdown
与之相关的配置有:
1 2 3
| controlled.shutdown.enable : 是否启用受控关闭操作 controlled.shutdown.max.retries 受控关机操作 最大重试的次数 controlled.shutdown.retry.backoff.ms 失败后等等多久再次重试
|
![在这里插入图片描述](https://img-blog.csdnimg.cn/301eb19fb043431c9ca8dd0ca07fdffb.png#pic_center)
其他场景
新创建的Topic Leader选举策略
创建新的Topic的时候,并没有发生Leader选举的操作, 而是默认从分区对应的所有在线副本中选择第一个为leader, 然后isr就为 所有在线副本,再组装一下当前的controller_epoch信息,写入到zk节点/brokers/topics/{Topic名称}/partitions/{分区号}/state
中。
最后发起 LeaderAndIsrRequest 请求,通知 leader 的变更。
详细看看源码:
PartitionStateMachine#doHandleStateChanges
分区状态从 NewPartition
流转到OnlinePartition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = { val successfulInitializations = mutable.Buffer.empty[TopicPartition] val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition)) val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) => val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition)) partition -> liveReplicasForPartition } val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
partitionsWithoutLiveReplicas.foreach { case (partition, replicas) => val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " + s"partition $partition from New to Online, assigned replicas are " + s"[${replicas.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " + "replica is alive." logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException(failMsg)) } val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) => val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) partition -> leaderIsrAndControllerEpoch }.toMap val createResponses = try { zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, controllerContext.epochZkVersion) } catch { case e: ControllerMovedException => error("Controller moved to another broker when trying to create the topic partition state znode", e) throw e case e: Exception => partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) } Seq.empty } createResponses.foreach { createResponse => val code = createResponse.resultCode val partition = createResponse.ctx.get.asInstanceOf[TopicPartition] val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition) if (code == Code.OK) { controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr, partition, leaderIsrAndControllerEpoch, controllerContext.partitionFullReplicaAssignment(partition), isNew = true) successfulInitializations += partition } else { logFailedStateChange(partition, NewPartition, OnlinePartition, code) } } successfulInitializations }
|
- 从当前的Controller 内存中获取所有入参的分区对应的副本信息
- 过滤那些已经下线的副本( Broker宕机、网络异常、磁盘脱机、等等都有可能造成副本下线) 。
- 每个分区对应的所有在线副本信息 为 ISR 信息,然后取ISR的第一个副本为leader分区。当然特别注意一下, 这个时候获取的isr信息的顺序就是 分区创建时候分配好的AR顺序, 获取第一个在线的。(因为在其他情况下 ISR的顺序跟AR的顺序并不一致)
- 组装 上面的
isr
、leader
、controller_epoch
等信息 写入到zk节点 /brokers/topics/{Topic名称}/partitions/{分区号}/state
例如下面所示 1
| {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}
|
- 然后向其他相关Broker 发起 LeaderAndIsrRequest 请求,通知他们Leader和Isr信息已经变更了,去做一下想要的处理。比如去新的leader发起Fetcher请求同步数据。
可以看看之前我们分析过的 Topic创建的源码解析 的原理图 如下
![Topic创建流程时序图](https://img-blog.csdnimg.cn/03c1df5d7859442b951f8801062df893.png)
重点看:
![4.4.1 已经确定Leader是谁了](https://img-blog.csdnimg.cn/f124461df9d941c4abe7e5201b1c6c72.png)
回答上面的问题
现在,看完全文之后,我想你应该对下面的问题很清楚了吧!
什么是分区状态机
所有的分区状态的流转都是通过分区状态机来进行的, 统一管理! 每个分区状态的流转 都是有严格限制并且固定的,流转到不同状态需要执行的操作不一样, 例如 当分区状态流转到 OnlinePartition 的时候, 就需要判断是否需要执行 Leader选举 ,
![分区状态机](https://img-blog.csdnimg.cn/7b93a3b3bbd6431d8a7ffa0c26e89233.png#pic_center)
创建Topic的时候如何选举Leader?
创建Topic的时候并没有发生 Leader选举, 而是默认将 在线的第一个副本设置为Leader,所有在线的副本列表 为 ISR 列表。 写入到了zookeeper中。
加szzdzhp001,领取全部kafka知识图谱
分区的所有副本都不在线, 这个时候启动一台之前不在ISR内的副本的Broker,它会当选为Leader吗?
视情况而定。
首先, 启动一台Broker, 会用什么策略选举?
看上面的图,我们可以知道是
OfflinePartitionLeaderElectionStrategy
然后看下这个策略是如何选举的?
![点击阅读原文查看高清大图](https://img-blog.csdnimg.cn/dcc01ecc3b8449c3aea858430aa92ad6.png)
那么最终结果就是:
所有副本不在线,那么一个Leader的候选者都当选不了
那么这个时候就会判断 unclean.leader.election.enable
配置是否为true.
如果是true, 则当前在线的副本就是只有自己这个刚启动的在线副本,自然而然就会当选Leader了。
如果是fase, 则没有副本能够当前Leader, 次数处于一个无Leader的状态。
当所有副本都不在线,然后一个一个重启Broker上副本上线,谁会当选为Leader?谁先启动就谁当选吗?
不是, 跟上一个问题同理
根据 unclean.leader.election.enable
配置决定。
如果是true, 则谁先启动,谁就当选(会丢失部分数据)
如果是false,则第一个在ISR列表中的副本当选。
顺便再提一句, 虽然在这里可能不是AR中的第一个副本当选Leader。
但是最终还是会自动执行Leader均衡的,自动均衡使用的策略是
PreferredReplicaPartitionLeaderElectionStrategy
(前提是开启了自动均衡: auto.leader.rebalance.enable=true
)
加szzdzhp001,领取全部kafka知识图谱
Broker下线了,Leader切换给了其他副本, 当Broker重启的时候,Leader会还给之前的副本吗?
根据配置 auto.leader.rebalance.enable=true
决定。
true: 会自动执行Leader均衡, 自动均衡策略是 PreferredReplicaPartitionLeaderElectionStrategy 策略
false: 不执行自动均衡。 那么久不会还回去。
关于更详细的 Leader均衡机制请看 Leader 均衡机制
加szzdzhp001,领取全部kafka知识图谱
Leader选举期间对分区的影响
Leader的选举基本上不会造成什么影响, Leader的切换非常快, 每个分区不可用的时间在几毫秒内。