删除Topic源码解析(附教学视频)
kafka管控推荐使用 滴滴开源 的 Kafka运维管控平台 更符合国人的操作习惯 ,
更强大的管控能力 ,更高效的问题定位能力 、更便捷的集群运维能力 、更专业的资源治理 、 更友好的运维生态
配套视频,请食用
阅读本文之前 你可以先思考一下以下几个问题,然后再带着问题去阅读本文
- 什么时候在/admin/delete_topics写入节点的?
- 什么时候真正执行删除Topic磁盘日志?
- Controller通知Brokers 执行StopReplica是通知所有的Broker还是只通知跟被删除Topic有关联的Broker?
- 删除过程有Broker不在线 或者执行失败怎么办
- 在重新分配的过程中,如果执行删除操作会怎么样
- 如果正在删除中发生了分区副本重分配的操作怎么办?
- 如果直接删除ZK上的
/brokers/topics/{topicName}
节点会怎样
删除Topic命令
bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic test
支持正则表达式匹配Topic来进行删除,只需要将topic 用双引号包裹起来
例如: 删除以create_topic_byhand_zk
为开头的topic;
bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “create_topic_byhand_zk.*”
.
表示任意匹配除换行符 \n 之外的任何单字符。要匹配 . ,请使用 . 。·*·
:匹配前面的子表达式零次或多次。要匹配 * 字符,请使用 *。.*
: 任意字符
删除任意Topic (慎用)
bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “.*?”
更多的用法请参考正则表达式
相关配置
配置 | 描述 | 默认 |
---|---|---|
file.delete.delay.ms | topic删除被标记为–delete文件之后延迟多长时间删除正在的Log文件 | 60000 |
delete.topic.enable | 是否能够删除topic | true |
源码解析
如果觉得阅读源码解析太枯燥,请直接看 源码总结及其后面部分
1. 客户端发起删除Topic的请求
在【kafka源码】TopicCommand之创建Topic源码解析 里面已经分析过了整个请求流程; 所以这里就不再详细的分析请求的过程了,直接看重点;
向Controller发起 deleteTopics
请求
2. Controller处理deleteTopics的请求
KafkaApis.handle
AdminManager.deleteTopics
1 | /** |
- zk中写入数据topic
/admin/delete_topics/Topic名称
; 标记要被删除的Topic - 如果客户端传过来的timeout<=0或者 写入zk数据过程异常了 则直接返回异常
3. Controller监听zk变更 执行删除Topic流程
KafkaController.processTopicDeletion
1 | private def processTopicDeletion(): Unit = { |
- 如果
/admin/delete_topics/
下面的节点有不存在的Topic,则清理掉 - 如果配置了
delete.topic.enable=false
不可删除Topic的话,则将/admin/delete_topics/
下面的节点全部删除,然后流程结束 delete.topic.enable=true
; 将主题标记为不符合删除条件,放到topicsIneligibleForDeletion
中; 不符合删除条件的是:Topic分区正在进行分区重分配- 将Topic添加到删除Topic列表
topicsToBeDeleted
中; - 然后调用
TopicDeletionManager.resumeDeletions()
方法执行删除操作
3.1 resumeDeletions 执行删除方法
TopicDeletionManager.resumeDeletions()
1 | private def resumeDeletions(): Unit = { |
重点看看
onTopicDeletion
方法,标记所有待删除分区;向Brokers发送updateMetadataRequest
请求,告知Brokers这个主题正在被删除,并将Leader设置为LeaderAndIsrLeaderDuringDelete
;- 将待删除的Topic的所有分区,执行分区状态机的转换 ;当前状态–>
OfflinePartition
->NonExistentPartition
; 这两个状态转换只是在当前Controller内存中更新了一下状态; 关于状态机请看 【kafka源码】Controller中的状态机TODO….; client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic))
向待删除Topic分区发送UpdateMetadata
请求; 这个时候更新了什么数据呢?
看上面图片源码, 发送UpdateMetadata
请求的时候把分区的Leader= -2; 表示这个分区正在被删除;那么所有正在被删除的分区就被找到了;拿到这些待删除分区之后干嘛呢?- 更新一下限流相关信息
- 调用
groupCoordinator.handleDeletedPartitions(deletedPartitions)
: 清除给定的deletedPartitions
的组偏移量以及执行偏移量删除的函数;就是现在该分区不能提供服务啦,不能被消费啦
详细请看 Kafka的元数据更新UpdateMetadata
- 调用
TopicDeletionManager.onPartitionDeletion
接口如下;
- 将待删除的Topic的所有分区,执行分区状态机的转换 ;当前状态–>
3.2 TopicDeletionManager.onPartitionDeletion
- 将所有Dead replicas 副本直接移动到
ReplicaDeletionIneligible
状态,如果某些副本已死,也将相应的主题标记为不适合删除,因为它无论如何都不会成功完成 - 副本状态转换成
OfflineReplica
; 这个时候会对该Topic的所有副本所在Broker发起StopReplicaRequest
请求;(参数deletePartitions = false
,表示还不执行删除操作); 以便他们停止向Leader
发送fetch
请求; 关于状态机请看 【kafka源码】Controller中的状态机TODO….; - 副本状态转换成
ReplicaDeletionStarted
状态,这个时候会对该Topic的所有副本所在Broker发起StopReplicaRequest
请求;(参数deletePartitions = true
,表示执行删除操作)。这将发送带有 deletePartition=true 的StopReplicaRequest
。并将删除相应分区的所有副本中的所有持久数据
4. Brokers 接受StopReplica请求
最终调用的是接口ReplicaManager.stopReplica
==> LogManager.asyncDelete
将给定主题分区“logdir”的目录重命名为“logdir.uuid.delete”,并将其添加到删除队列中
例如 :
1 | def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = { |
4.1 日志清理定时线程
上面我们知道最终是将待删除的Log添加到了
logsToBeDeleted
这个队列中; 这个队列就是待删除Log队列,有一个线程kafka-delete-logs
专门来处理的;我们来看看这个线程怎么工作的
LogManager.startup
启动的时候 ,启动了一个定时线程
1 | scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period |
删除日志的线程
1 | /** |
file.delete.delay.ms
决定延迟多久删除
5.StopReplica 请求成功 执行回调接口
Topic删除完成, 清理相关信息
触发这个接口的地方是: 每个Broker执行删除StopReplica
成功之后,都会执行一个回调函数;TopicDeletionStopReplicaResponseReceived
; 当然调用方是Controller,回调到的也就是Controller;
传入回调函数的地方
执行回调函数 KafkaController.processTopicDeletionStopReplicaResponseReceived
如果回调有异常,删除失败则将副本状态转换成==》
ReplicaDeletionIneligible
,并且重新执行resumeDeletions
方法;如果回调正常,则变更状态
ReplicaDeletionStarted
==》ReplicaDeletionSuccessful
;并且重新执行resumeDeletions
方法;resumeDeletions
方法会判断所有副本是否均被删除,如果全部删除了就会执行下面的completeDeleteTopic
代码;否则会继续删除未被成功删除的副本1
2
3
4
5
6
7
8
9
10
11
12private def completeDeleteTopic(topic: String): Unit = {
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
client.mutePartitionModifications(topic)
val replicasForDeletedTopic = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica)
controllerContext.topicsToBeDeleted -= topic
controllerContext.topicsWithDeletionStarted -= topic
client.deleteTopic(topic, controllerContext.epochZkVersion)
controllerContext.removeTopic(topic)
}- 清理内存中相关信息
- 取消注册被删除Topic的相关节点监听器;节点是
/brokers/topics/Topic名称
- 删除zk中的数据包括;
/brokers/topics/Topic名称
、/config/topics/Topic名称
、/admin/delete_topics/Topic名称
6. Controller启动时候 尝试继续处理待删除的Topic
我们之前分析Controller上线的时候有看到KafkaController.onControllerFailover
以下省略部分代码
1 | private def onControllerFailover(): Unit = { |
6.1 获取需要被删除的Topic和暂时不能删除的Topic
fetchTopicDeletionsInProgress
topicsToBeDeleted
所有需要被删除的Topic从zk中/admin/delete_topics
获取topicsIneligibleForDeletion
有一部分Topic还暂时不能被删除:
①. Topic任意分区正在进行副本重分配
②. Topic任意分区副本存在不在线的情况(只有topic有一个副本所在的Broker异常就不能能删除)③. Topic的所有副本存在数据盘脱机的情况(replicasOnOfflineDirs)
将得到的数据存在在
controllerContext
内存中
6.2 topicDeletionManager.init初始化删除管理器
- 如果服务器配置
delete.topic.enable=false
不允许删除topic的话,则删除/admin/delete_topics
中的节点; 这个节点下面的数据是标记topic需要被删除的意思;
6.3 topicDeletionManager.tryTopicDeletion尝试恢复删除
这里又回到了上面分析过的resumeDeletions
啦;恢复删除操作
1 | def tryTopicDeletion(): Unit = { if (isDeleteTopicEnabled) { resumeDeletions() } } |
源码总结
整个Topic删除, 请看下图
几个注意点:
- Controller 也是Broker
- Controller发起删除请求的时候,只是跟相关联的Broker发起删除请求;
- Broker不在线或者删除失败,Controller会持续进行删除操作; 或者Broker上线之后继续进行删除操作
Q&A
列举在此主题下比较常见的问题; 如果读者有其他问题可以在评论区评论, 博主会不定期更新
什么时候在/admin/delete_topics写入节点的
客户端发起删除操作deleteTopics的时候,Controller响应deleteTopics请求, 这个时候Controller就将待删除Topic写入了zk的
/admin/delete_topics/Topic名称
节点中了;
什么时候真正执行删除Topic磁盘日志
Controller监听到zk节点
/admin/delete_topics
之后,向所有存活的Broker发送删除Topic的请求; Broker收到请求之后将待删除副本标记为–delete后缀; 然后会有专门日志清理现场来进行真正的删除操作; 延迟多久删除是靠file.delete.delay.ms
来决定的;默认是60000毫秒 = 一分钟
为什么正在重新分配的Topic不能被删除
正在重新分配的Topic,你都不知道它具体会落在哪个地方,所以肯定也就不知道啥时候删除啊;
等分配完毕之后,就会继续删除流程
如果在/admin/delete_topics/
中手动写入一个节点会不会正常删除
如果写入的节点,并不是一个真实存在的Topic;则将会直接被删除
当然要注意如果配置了delete.topic.enable=false
不可删除Topic的话,则将/admin/delete_topics/
下面的节点全部删除,然后流程结束
如果写入的节点是一个真实存在的Topic; 则将会执行删除Topic的流程; 本质上跟用Kafka客户端执行删除Topic操作没有什么不同
如果直接删除ZK上的/brokers/topics/{topicName}
节点会怎样
TODO…
Controller通知Brokers 执行StopReplica是通知所有的Broker还是只通知跟被删除Topic有关联的Broker?
只是通知跟被删除Topic有关联的Broker;
请看下图源码,可以看到所有需要被StopReplica
的副本都是被过滤了一遍,获取它们所在的BrokerId; 最后调用的时候也是sendRequest(brokerId, stopReplicaRequest)
;根据获取到的BrokerId发起的请求
删除过程有Broker不在线 或者执行失败怎么办
Controller会继续删除操作;或者等Broker上线然后继续删除操作; 反正就是一定会保证所有的分区都被删除(被标记了–delete)之后才会把zk上的数据清理掉;
ReplicaStateMachine 副本状态机
在重新分配的过程中,如果执行删除操作会怎么样
删除操作会等待,等待重新分配完成之后,继续进行删除操作
Finally: 本文阅读源码为 Kafka-2.5
作者石臻臻,工作8年的互联网老兵,丰富的开发和管理经验,全网「 粉丝数4万 」,
先后从事 「 电商 」、「 中间件 」、「 大数据」 等工作
现在任职于「 滴滴技术专家 」岗位,从事开源建设工作
目前在维护 个人公众号「 石臻臻的杂货铺 」 ; 关注公众号会有「 日常送书活动 」;
欢迎进「 高质量 」 「 滴滴开源技术答疑群 」 , 群内每周技术专家轮流值班答疑
===============================可帮忙「 内推 」一二线大厂 ===============================