创建Topic的源码解析
kafka管控推荐使用 滴滴开源 的 Kafka运维管控平台 更符合国人的操作习惯 ,
更强大的管控能力 ,更高效的问题定位能力 、更便捷的集群运维能力 、更专业的资源治理 、 更友好的运维生态
配套视频,请食用
脚本参数
sh bin/kafka-topic -help 查看更具体参数
下面只是列出了跟 --create 相关的参数
| 参数 | 描述 | 例子 | 
|---|---|---|
| --bootstrap-server 指定kafka服务 | 指定连接到的kafka服务; 如果有这个参数,则 --zookeeper可以不需要 | –bootstrap-server localhost:9092 | 
| --zookeeper | 弃用, 通过zk的连接方式连接到kafka集群; | –zookeeper localhost:2181 或者localhost:2181/kafka | 
| --replication-factor  | 副本数量,注意不能大于broker数量;如果不提供,则会用集群中默认配置 | –replication-factor 3 | 
| --partitions | 分区数量 | 当创建或者修改topic的时候,用这个来指定分区数;如果创建的时候没有提供参数,则用集群中默认值; 注意如果是修改的时候,分区比之前小会有问题 | 
| --replica-assignment  | 副本分区分配方式;创建topic的时候可以自己指定副本分配情况; | --replica-assignmentBrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0  ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本 | 
| --config <String: name=value> | 用来设置topic级别的配置以覆盖默认配置;只在–create 和–bootstrap-server 同时使用时候生效; 可以配置的参数列表请看文末附件 | 例如覆盖两个配置 --config retention.bytes=123455 --config retention.ms=600001 | 
| --command-config<String: command    文件路径> | 用来配置客户端Admin Client启动配置,只在–bootstrap-server 同时使用时候生效; | 例如:设置请求的超时时间 --command-config config/producer.proterties ; 然后在文件中配置 request.timeout.ms=300000 | 
| --create | 命令方式; 表示当前请求是创建Topic | --create | 
创建Topic脚本
zk方式(不推荐)
| 1 | bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test | 
需要注意的是–zookeeper后面接的是kafka的zk配置, 假如你配置的是localhost:2181/kafka 带命名空间的这种,不要漏掉了
kafka版本 >= 2.2 支持下面方式(推荐)
| 1 | bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test | 
当前分析的kafka源码版本为 kafka-2.5
创建Topic 源码分析
温馨提示: 如果阅读源码略显枯燥,你可以直接看源码总结以及后面部分
首先我们找到源码入口处, 查看一下 kafka-topic.sh脚本的内容exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
最终是执行了kafka.admin.TopicCommand这个类,找到这个地方之后就可以断点调试源码了,用IDEA启动
记得配置一下入参
比如: --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic test_create_topic3
 
1. 源码入口

上面的源码主要作用是
- 根据是否有传入参数--zookeeper来判断创建哪一种 对象topicService
 如果传入了--zookeeper则创建 类ZookeeperTopicService的对象
 否则创建类AdminClientTopicService的对象(我们主要分析这个对象)
- 根据传入的参数类型判断是创建topic还是删除等等其他 判断依据是 是否在参数里传入了--create
2. 创建AdminClientTopicService 对象
val topicService = new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
2.1 先创建 Admin
| 1 | object AdminClientTopicService { | 
- 如果有入参--command-config,则将这个文件里面的参数都放到mapcommandConfig里面, 并且也加入bootstrap.servers的参数;假如配置文件里面已经有了bootstrap.servers配置,那么会将其覆盖
- 将上面的commandConfig作为入参调用Admin.create(commandConfig)创建 Admin; 这个时候调用的Client模块的代码了, 从这里我们就可以看出,我们调用kafka-topic.sh脚本实际上是kafka模拟了一个客户端Client来创建Topic的过程; 
3. AdminClientTopicService.createTopic 创建Topic
        topicService.createTopic(opts)
| 1 | case class AdminClientTopicService private (adminClient: Admin) extends TopicService { | 
- 检查各项入参是否有问题
- adminClient.listTopics(),然后比较是否已经存在待创建的Topic;如果存在抛出异常;
- 判断是否配置了参数--replica-assignment; 如果配置了,那么Topic就会按照指定的方式来配置副本情况
- 解析配置--config配置放到configsMap中;configsMap给到NewTopic对象
- 调用adminClient.createTopics创建Topic; 它是如何创建Topic的呢?往下分析源码
3.1 KafkaAdminClient.createTopics(NewTopic) 创建Topic
| 1 | 
 | 
这个代码里面主要看下Call里面的接口; 先不管Kafka如何跟服务端进行通信的细节; 我们主要关注创建Topic的逻辑;
- createRequest会构造一个请求参数- CreateTopicsRequest例如下图 
- 选择ControllerNodeProvider这个节点发起网络请求 
 可以清楚的看到, 创建Topic这个操作是需要Controller来执行的; 
4. 发起网络请求
5. Controller角色的服务端接受请求处理逻辑
首先找到服务端处理客户端请求的 源码入口 ⇒ KafkaRequestHandler.run()
主要看里面的 apis.handle(request) 方法; 可以看到客户端的请求都在request.bodyAndSize()里面
5.1 KafkaApis.handle(request) 根据请求传递Api调用不同接口
进入方法可以看到根据request.header.apiKey  调用对应的方法,客户端传过来的是CreateTopics
5.2 KafkaApis.handleCreateTopicsRequest 处理创建Topic的请求
| 1 | def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = { | 
- 判断当前处理的broker是不是Controller,如果不是Controller的话直接抛出异常,从这里可以看出,CreateTopic这个操作必须是Controller来进行, 出现这种情况有可能是客户端发起请求的时候Controller已经变更;
- 鉴权 【Kafka源码】kafka鉴权机制
- 调用adminManager.createTopics()
5.3 adminManager.createTopics()
创建主题并等等主题完全创建,回调函数将会在超时、错误、或者主题创建完成时触发
该方法过长,省略部分代码
| 1 | def createTopics(timeout: Int, | 
- 做一些校验检查 
 ①.检查Topic是否存在
 ②. 检查- --replica-assignment参数和 (- --partitions || --replication-factor) 不能同时使用
 ③.如果(- --partitions || --replication-factor) 没有设置,则使用 Broker的配置(这个Broker肯定是Controller)
 ④.计算分区副本分配方式
- createTopicPolicy根据Broker是否配置了创建Topic的自定义校验策略; 使用方式是自定义实现- org.apache.kafka.server.policy.CreateTopicPolicy接口;并 在服务器配置- create.topic.policy.class.name=自定义类; 比如我就想所有创建Topic的请求分区数都要大于10; 那么这里就可以实现你的需求了
- createTopicWithAssignment把topic相关数据写入到zk中; 进去分析一下
5.4 写入zookeeper数据
我们进入到             adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)看看有哪些数据写入到了zk中;
| 1 | def createTopicWithAssignment(topic: String, | 
源码就不再深入了,这里直接详细说明一下
写入Topic配置信息
- 先调用SetDataRequest请求往节点/config/topics/Topic名称写入数据; 这里
 一般这个时候都会返回NONODE (NoNode);节点不存在; 假如zk已经存在节点就直接覆盖掉
- 节点不存在的话,就发起CreateRequest请求,写入数据; 并且节点类型是持久节点
这里写入的数据,是我们入参时候传的topic配置--config; 这里的配置会覆盖默认配置
写入Topic分区副本信息
- 将已经分配好的副本分配策略写入到 /brokers/topics/Topic名称中; 节点类型 持久节点 
具体跟zk交互的地方在ZookeeperClient.send() 这里包装了很多跟zk的交互;
6. Controller监听 /brokers/topics/Topic名称, 通知Broker将分区写入磁盘
Controller 有监听zk上的一些节点; 在上面的流程中已经在zk中写入了
/brokers/topics/Topic名称; 这个时候Controller就监听到了这个变化并相应;
KafkaController.processTopicChange
| 1 | private def processTopicChange(): Unit = { //如果处理的不是Controller角色就返回 if (!isActive) return //从zk中获取 `/brokers/topics 所有Topic val topics = zkClient.getAllTopicsInCluster //找出哪些是新增的 val newTopics = topics -- controllerContext.allTopics //找出哪些Topic在zk上被删除了 val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics) deletedTopics.foreach(controllerContext.removeTopic) addedPartitionReplicaAssignment.foreach { case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment) } info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " + s"[$addedPartitionReplicaAssignment]") if (addedPartitionReplicaAssignment.nonEmpty) onNewPartitionCreation(addedPartitionReplicaAssignment.keySet) } | 
- 从zk中获取 - /brokers/topics所有Topic跟当前Broker内存中所有Broker- controllerContext.allTopics的差异; 就可以找到我们新增的Topic; 还有在zk中被删除了的Broker(该Topic会在当前内存中remove掉)
- 从zk中获取 - /brokers/topics/{TopicName}给定主题的副本分配。并保存在内存中 
- 执行 - onNewPartitionCreation;分区状态开始流转
6.1 onNewPartitionCreation 状态流转
关于Controller的状态机 详情请看: 【kafka源码】Controller中的状态机
| 1 | /** * This callback is invoked by the topic change callback with the list of failed brokers as input. * It does the following - * 1. Move the newly created partitions to the NewPartition state * 2. Move the newly created partitions from NewPartition->OnlinePartition state */ private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = { info(s"New partition creation callback for ${newPartitions.mkString(",")}") partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica) partitionStateMachine.handleStateChanges( newPartitions.toSeq, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)) ) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica) } | 
- 将待创建的分区状态流转为NewPartition; 
- 将待创建的副本 状态流转为NewReplica; 
- 将分区状态从刚刚的NewPartition流转为OnlinePartition
- 获取leaderIsrAndControllerEpochs; Leader为副本的第一个;1. 向zk中写入`/brokers/topics/{topicName}/partitions/` 持久节点; 无数据 2. 向zk中写入`/brokers/topics/{topicName}/partitions/{分区号}` 持久节点; 无数据 3. 向zk中写入`/brokers/topics/{topicName}/partitions/{分区号}/state` 持久节点; 数据为`leaderIsrAndControllerEpoch`- 向副本所属Broker发送leaderAndIsrRequest请求
- 向所有Broker发送UPDATE_METADATA请求
 
- 向副本所属Broker发送
- 将副本状态从刚刚的NewReplica流转为OnlineReplica,更新下内存
关于分区状态机和副本状态机详情请看【kafka源码】Controller中的状态机
7. Broker收到LeaderAndIsrRequest 创建本地Log
上面步骤中有说到向副本所属Broker发送
leaderAndIsrRequest请求,那么这里做了什么呢
其实主要做的是 创建本地Log代码太多,这里我们直接定位到只跟创建Topic相关的关键代码来分析
KafkaApis.handleLeaderAndIsrRequest->replicaManager.becomeLeaderOrFollower->ReplicaManager.makeLeaders...LogManager.getOrCreateLog
| 1 | /** * 如果日志已经存在,只返回现有日志的副本否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志 否则抛出 KafkaStorageException */ def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = { logCreationOrDeletionLock synchronized { getLog(topicPartition, isFuture).getOrElse { // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDirs: List[File] = { val preferredLogDir = preferredLogDirs.get(topicPartition) if (isFuture) { if (preferredLogDir == null) throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory") else if (getLog(topicPartition).get.dir.getParent == preferredLogDir) throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition") } if (preferredLogDir != null) List(new File(preferredLogDir)) else nextLogDirs() } val logDirName = { if (isFuture) Log.logFutureDirName(topicPartition) else Log.logDirName(topicPartition) } val logDir = logDirs .toStream // to prevent actually mapping the whole list, lazy map .map(createLogDirectory(_, logDirName)) .find(_.isSuccess) .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", ")))) .get // If Failure, will throw val log = Log( dir = logDir, config = config, logStartOffset = 0L, recoveryPoint = 0L, maxProducerIdExpirationMs = maxPidExpirationMs, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, scheduler = scheduler, time = time, brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel) if (isFuture) futureLogs.put(topicPartition, log) else currentLogs.put(topicPartition, log) info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.") // Remove the preferred log dir since it has already been satisfied preferredLogDirs.remove(topicPartition) log } } } | 
- 如果日志已经存在,只返回现有日志的副本否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志 否则抛出KafkaStorageException
详细请看 【kafka源码】LeaderAndIsrRequest请求
源码总结
如果上面的源码分析,你不想看,那么你可以直接看这里的简洁叙述
- 根据是否有传入参数--zookeeper来判断创建哪一种 对象topicService
 如果传入了--zookeeper则创建 类ZookeeperTopicService的对象
 否则创建类AdminClientTopicService的对象(我们主要分析这个对象)
- 如果有入参--command-config,则将这个文件里面的参数都放到mapl类型commandConfig里面, 并且也加入bootstrap.servers的参数;假如配置文件里面已经有了bootstrap.servers配置,那么会将其覆盖
- 将上面的commandConfig作为入参调用Admin.create(commandConfig)创建 Admin; 这个时候调用的Client模块的代码了, 从这里我们就可以猜测,我们调用kafka-topic.sh脚本实际上是kafka模拟了一个客户端Client来创建Topic的过程;
- 一些异常检查
 ①.如果配置了副本副本数–replication-factor 一定要大于0
 ②.如果配置了–partitions 分区数 必须大于0
 ③.去zk查询是否已经存在该Topic
- 判断是否配置了参数--replica-assignment; 如果配置了,那么Topic就会按照指定的方式来配置副本情况
- 解析配置--config配置放到configsMap中; configsMap给到NewTopic对象
- 将上面所有的参数包装成一个请求参数CreateTopicsRequest;然后找到是Controller的节点发起请求(ControllerNodeProvider)
- 服务端收到请求之后,开始根据CreateTopicsRequest来调用创建Topic的方法; 不过首先要判断一下自己这个时候是不是Controller; 有可能这个时候Controller重新选举了; 这个时候要抛出异常
- 服务端进行一下请求参数检查
 ①.检查Topic是否存在
 ②.检查--replica-assignment参数和 (--partitions||--replication-factor) 不能同时使用
- 如果(--partitions||--replication-factor) 没有设置,则使用 Broker的默认配置(这个Broker肯定是Controller)
- 计算分区副本分配方式;如果是传入了 --replica-assignment;则会安装自定义参数进行组装;否则的话系统会自动计算分配方式; 具体详情请看 【kafka源码】创建Topic的时候是如何分区和副本的分配规则
- createTopicPolicy根据Broker是否配置了创建Topic的自定义校验策略; 使用方式是自定义实现- org.apache.kafka.server.policy.CreateTopicPolicy接口;并 在服务器配置- create.topic.policy.class.name=自定义类; 比如我就想所有创建Topic的请求分区数都要大于10; 那么这里就可以实现你的需求了
- zk中写入Topic配置信息 发起CreateRequest请求,这里写入的数据,是我们入参时候传的topic配置--config; 这里的配置会覆盖默认配置;并且节点类型是持久节点;path =/config/topics/Topic名称
- zk中写入Topic分区副本信息 发起CreateRequest请求 ,将已经分配好的副本分配策略 写入到/brokers/topics/Topic名称中; 节点类型 持久节点
- Controller监听zk上面的topic信息; 根据zk上变更的topic信息;计算出新增/删除了哪些Topic; 然后拿到新增Topic的 副本分配信息; 并做一些状态流转
- 向新增Topic所在Broker发送leaderAndIsrRequest请求,
- Broker收到发送leaderAndIsrRequest请求; 创建副本Log文件;

Q&A
创建Topic的时候 在Zk上创建了哪些节点
接受客户端请求阶段:
- topic的配置信息
/config/topics/Topic名称持久节点- topic的分区信息
/brokers/topics/Topic名称持久节点Controller监听zk节点
/brokers/topics变更阶段
/brokers/topics/{topicName}/partitions/持久节点; 无数据- 向zk中写入
/brokers/topics/{topicName}/partitions/{分区号}持久节点; 无数据- 向zk中写入
/brokers/topics/{topicName}/partitions/{分区号}/state持久节点;
创建Topic的时候 什么时候在Broker磁盘上创建的日志文件
当Controller监听zk节点
/brokers/topics变更之后,将新增的Topic 解析好的分区状态流转NonExistentPartition->NewPartition->OnlinePartition当流转到OnlinePartition的时候会像分区分配到的Broker发送一个leaderAndIsrRequest请求,当Broker们收到这个请求之后,根据请求参数做一些处理,其中就包括检查自身有没有这个分区副本的本地Log;如果没有的话就重新创建;
如果我没有指定分区数或者副本数,那么会如何创建
我们都知道,如果我们没有指定分区数或者副本数, 则默认使用Broker的配置, 那么这么多Broker,假如不小心默认值配置不一样,那究竟使用哪一个呢? 那肯定是哪台机器执行创建topic的过程,就是使用谁的配置;
所以是谁执行的? 那肯定是Controller啊! 上面的源码我们分析到了,创建的过程,会指定Controller这台机器去进行;
如果我手动删除了/brokers/topics/下的某个节点会怎么样?
如果我手动在zk中添加/brokers/topics/{TopicName}节点会怎么样
先说结论: 根据上面分析过的源码画出的时序图可以指定; 客户端发起创建Topic的请求,本质上是去zk里面写两个数据
- topic的配置信息
/config/topics/Topic名称持久节点- topic的分区信息
/brokers/topics/Topic名称持久节点
所以我们绕过这一步骤直接去写入数据,可以达到一样的效果;不过我们的数据需要保证准确
因为在这一步已经没有了一些基本的校验了; 假如这一步我们写入的副本Brokerid不存在会怎样,从时序图中可以看到,leaderAndIsrRequest请求; 就不会正确的发送的不存在的BrokerId上,那么那台机器就不会创建Log文件;下面不妨让我们来验证一下;
创建一个节点/brokers/topics/create_topic_byhand_zk节点数据为下面数据;
这里我用的工具PRETTYZOO手动创建的,你也可以用命令行创建;
创建完成之后我们再看看本地有没有生成一个Log文件
可以看到我们指定的Broker,已经生成了对应的分区副本Log文件;
而且zk中也写入了其他的数据在我们写入zk数据的时候,就已经确定好了哪个每个分区的Leader是谁了,那就是第一个副本默认为Leader
如果写入/brokers/topics/{TopicName}节点之后Controller挂掉了会怎么样
先说结论:Controller 重新选举的时候,会有一些初始化的操作; 会把创建过程继续下去
启动状态机的过程是不是跟上面的6.1 onNewPartitionCreation 状态流转 的过程很像; 最终都把状态流转到了
OnlinePartition; 伴随着是不发起了leaderAndIsrRequest请求; 是不是Broker收到请求之后,创建本地Log文件了创建Topic的时候 Leader选举策略是什么样子的?
TODO..
附件
–config 可生效参数
请以
sh bin/kafka-topic -help为准
Tips:如果关于本篇文章你有疑问,可以在评论区留下,或者直接加我微信
PS: 文章阅读的源码版本是kafka-2.5
作者石臻臻,工作8年的互联网老兵,丰富的开发和管理经验,全网「 粉丝数4万 」,
先后从事 「 电商 」、「 中间件 」、「 大数据」 等工作
现在任职于「 滴滴技术专家 」岗位,从事开源建设工作
目前在维护 个人公众号「 石臻臻的杂货铺 」 ; 关注公众号会有「 日常送书活动 」;
欢迎进「 高质量 」 「 滴滴开源技术答疑群 」 , 群内每周技术专家轮流值班答疑
===============================可帮忙「 内推 」一二线大厂 ===============================

















