【kafka源码】/log_dir_event_notification的LogDir脱机事件通知
kafka管控推荐使用 滴滴开源 的 Kafka运维管控平台 更符合国人的操作习惯 ,
更强大的管控能力 ,更高效的问题定位能力 、更便捷的集群运维能力 、更专业的资源治理 、 更友好的运维生态
前言
我们会看到zk的数据中有一个节点
/log_dir_event_notification/
,这是一个序列号持久节点
这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现异常时(比如磁盘损坏,文件读写失败,等等异常): 向zk中谢增一个子节点/log_dir_event_notification/log_dir_event_序列号
;Controller监听到这个节点的变更之后,会向Brokers们发送LeaderAndIsrRequest
请求; 然后做一些副本脱机的善后操作
源码分析
这里说的dirLog是 server.properties中配置的log.dir
例如
副本异常处理
首先我们找到有使用这个节点的源码;
kafka启动之初有调用
ReplicaManager.startup()
1 | def startup(): Unit = { |
1 | // logDir should be an absolute path |
代码比较长,就直接概况一下好了:
主要是当读取或操作LogDir的时候出现了异常就会执行到这里,有可能是磁盘脱机了,或者文件突然没有读取写入权限等等之类的一些IOException
异常;那么 Broker就需要做一些处理;如下
- 做个判断
inter.broker.protocol.version
协议版本 <1.0
的时候 时候直接退出;那个时候还不支持单Broker上存在多个logDir; - 副本停止fetche数据
- 标记分区下线
- 可能移除一些监控信息
- 如果当前的
log_dir
都脱机(或者异常了), 那么久可以直接shutdown这台机器了 - 如果还有其他的
log_dir
还有在线的, 那么继续做一些其他的清理操作; - 创建持久序列节点
/log_dir_event_notification/log_dir_event_
+序列号;数据是 BrokerID;例如:
/log_dir_event_notification/log_dir_event_0000000003
1
{"version":1,"broker":20003,"event":1}
PS: log_dir
是可以在一台Broker配置多个路径的 ,用逗号隔开
LogDir发生异常
比如说在 给文件加锁的时候lockLogDirs,磁盘损坏了就抛出异常IOException
1 | /** |
offlineLogDirQueue
添加了一个异常队列之后就回到上面的副本异常处理代码了, 上面可是一致在queue.take()
的
Controller监听zk节点变更
KafkaController.processLogDirEventNotification
1 | private def processLogDirEventNotification(): Unit = { |
主要将从zk节点 /log_dir_event_notification/log_dir_event_序列号
中获取到的数据的Broker上的所有副本进行一个副本状态流转 ->OnlineReplica
;关于状态机的流转请看 【kafka源码】Controller中的状态机
- 给所有broker 发送
LeaderAndIsrRequest
请求,让brokers们去查询他们的副本的状态,如果副本logDir已经离线则返回KAFKA_STORAGE_ERROR
异常; - 完事之后会删除节点
源码总结
Q&A
作者石臻臻,工作8年的互联网老兵,丰富的开发和管理经验,全网「 粉丝数4万 」,
先后从事 「 电商 」、「 中间件 」、「 大数据」 等工作
现在任职于「 滴滴技术专家 」岗位,从事开源建设工作
目前在维护 个人公众号「 石臻臻的杂货铺 」 ; 关注公众号会有「 日常送书活动 」;
欢迎进「 高质量 」 「 滴滴开源技术答疑群 」 , 群内每周技术专家轮流值班答疑
可帮忙「 内推 」一二线大厂