本文所有命令,博主均全部操作验证过,保证准确性; 非复制粘贴拼凑文章; 如果想了解更多工具命令,可在评论区留下评论,博主会择期,加上;


kafka管控推荐使用 滴滴开源 Kafka运维管控平台 更符合国人的操作习惯 ,

更强大的管控能力 ,更高效的问题定位能力更便捷的集群运维能力更专业的资源治理更友好的运维生态 ..

TopicCommand Topic相关

1.查询所有Topic列表

1
sh bin/kafka-topics.sh  --bootstrap-server xxxxxx:9092 --list --exclude-internal

2.查询匹配Topic列表(正则表达式)

查询test_create_开头的所有Topic列表

1
sh bin/kafka-topics.sh  --bootstrap-server xxxxxx:9092 --list --exclude-internal  --topic "test_create_.*"

相关可选参数

参数 描述 例子
--exclude-internal 排除kafka内部topic,比如__consumer_offsets-* --exclude-internal
--topic 可以正则表达式进行匹配,展示topic名称 --topic

1.查询单个Topic

1
sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

2.批量查询Topic(正则表达式匹配,下面是查询所有Topic)

1
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal

支持正则表达式匹配Topic,只需要将topic 用双引号包裹起来


相关可选参数

参数 描述 例子
--bootstrap-server 指定kafka服务 指定连接到的kafka服务; 如果有这个参数,则 --zookeeper可以不需要 –bootstrap-server localhost:9092
--at-min-isr-partitions 查询的时候省略一些计数和配置信息 --at-min-isr-partitions
--exclude-internal 排除kafka内部topic,比如__consumer_offsets-* --exclude-internal
--topics-with-overrides 仅显示已覆盖配置的主题,也就是单独针对Topic设置的配置覆盖默认配置;不展示分区信息 --topics-with-overrides

1
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

相关可选参数

参数 描述 例子
--bootstrap-server 指定kafka服务 指定连接到的kafka服务; 如果有这个参数,则 --zookeeper可以不需要 –bootstrap-server localhost:9092
--zookeeper 弃用, 通过zk的连接方式连接到kafka集群; –zookeeper localhost:2181 或者localhost:2181/kafka
--replication-factor 副本数量,注意不能大于broker数量;如果不提供,则会用集群中默认配置 –replication-factor 3
--partitions 分区数量,当创建或者修改topic的时候,用这个来指定分区数;如果创建的时候没有提供参数,则用集群中默认值; 注意如果是修改的时候,分区比之前小会有问题 –partitions 3
--replica-assignment 副本分区分配方式;创建topic的时候可以自己指定副本分配情况; --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本
--config <String: name=value> 用来设置topic级别的配置以覆盖默认配置;只在–create 和–bootstrap-server 同时使用时候生效; 可以配置的参数列表请看文末附件 例如覆盖两个配置 --config retention.bytes=123455 --config retention.ms=600001
--command-config <String: command 文件路径> 用来配置客户端Admin Client启动配置,只在–bootstrap-server 同时使用时候生效; 例如:设置请求的超时时间 --command-config config/producer.proterties ; 然后在文件中配置 request.timeout.ms=300000
1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test

支持正则表达式匹配Topic来进行删除,只需要将topic 用双引号包裹起来
例如: 删除以create_topic_byhand_zk为开头的topic;

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "create_topic_byhand_zk.*"

.表示任意匹配除换行符 \n 之外的任何单字符。要匹配 . ,请使用 . 。
·*·:匹配前面的子表达式零次或多次。要匹配 * 字符,请使用 *。
.* : 任意字符

删除任意Topic (慎用)

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ".*?" 

更多的用法请参考正则表达式

相关配置

配置 描述 默认
file.delete.delay.ms topic删除被标记为–delete文件之后延迟多长时间删除正在的Log文件 60000
delete.topic.enable true 是否能够删除topic

zk方式(不推荐)

1
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

kafka版本 >= 2.2 支持下面方式(推荐)

单个Topic扩容

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

批量扩容 (将所有正则表达式匹配到的Topic分区扩容到4个)

1
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

".*?" 正则表达式的意思是匹配所有; 您可按需匹配

PS: 当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;


相关可选参数

参数 描述 例子
--replica-assignment 副本分区分配方式;创建topic的时候可以自己指定副本分配情况; --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表示副本

PS: 虽然这里配置的是全部的分区副本分配配置,但是正在生效的是新增的分区;
比如: 以前3分区1副本是这样的

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2

现在新增一个分区,--replica-assignment 2,1,3,4 ; 看这个意思好像是把0,1号分区互相换个Broker

Broker-1 Broker-2 Broker-3 Broker-4
1 0 2 3

但是实际上不会这样做,Controller在处理的时候会把前面3个截掉; 只取新增的分区分配方式,原来的还是不会变

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2 3

ConfigCommand 动态配置

Config相关操作; 动态配置可以覆盖默认的静态配置;

在动态配置中,很多--entity-type是可以配置默认动态配置的,比如针对所有Topic配置,针对所有Broker配置等等,如果没有配置针对单个Topic或Broker的配置就会用这个默认的动态配置;

默认配置的参数: --entity-default

1
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9090 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=88888888

动态配置的默认配置是使用了节点 <defalut>; 哪些有默认动态配置可以看下图

在这里插入图片描述

优先级: 指定动态配置>默认动态配置>静态配置

展示关于Topic的动静态配置

1.查询单个Topic配置(只列举动态配置)

1
sh bin/kafka-configs.sh --describe  --bootstrap-server xxxxx:9092 --topic test_create_topic

或者

1
sh bin/kafka-configs.sh --describe  --bootstrap-server xxxx:9092 --entity-type topics --entity-name test_create_topic

2.查询所有Topic配置(包括内部Topic)(只列举动态配置)

1
sh bin/kafka-configs.sh --describe  --bootstrap-server xxxx:9092 --entity-type topics 

在这里插入图片描述
在这里插入图片描述

3.查询Topic的详细配置(动态+静态)

只需要加上一个参数--all

其他配置/clients/users/brokers/broker-loggers 的查询

同理 ;只需要将--entity-type 改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)

在这里插入图片描述

查询kafka 脚本版本信息

1
sh bin/kafka-configs.sh  --version

所有可配置的动态配置 请看最后面的 附件 部分

--alter

--add-config k1=v1,k2=v2

--entity-type: 实体类型 (topics/clients/users/brokers/broker- loggers)

--entity-name: 实体名称

1
sh bin/kafka-configs.sh   --bootstrap-server xxxxx:9092 --alter  --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999 

修改跟新增命令都是一样的, 修改的时候会将配置的kv键值对覆盖旧的

--alter

--delete-config k1,k2

--entity-type: 实体类型 (topics/clients/users/brokers/broker- loggers)

--entity-name: 实体名称

1
sh bin/kafka-configs.sh   --bootstrap-server xxxxx:9092 --alter  --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms 
1
sh bin/kafka-configs.sh   --bootstrap-server xxxxx:9092 --alter  --entity-type brokers  --entity-default --add-config log.segment.bytes=788888888 --delete-config log.retention.ms 

五类动态配置说明

在这里插入图片描述

也可执行命令kafka-configs.sh -help 查看当前版本可配置的参数

key value 示例
consumer_byte_rate
producer_byte_rate
request_percentage
key value 示例
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable
key value 示例
cleanup.policy 清理策略
compression.type 压缩类型(通常建议在produce端控制)
delete.retention.ms 压缩日志的保留时间
file.delete.delay.ms topic删除被标记为–delete文件之后延迟多长时间删除正在的Log文件 60000
flush.messages 持久化message限制
flush.ms 持久化频率
follower.replication.throttled.replicas flowwer副本限流 格式:分区号:副本follower号,分区号:副本follower号 0:1,1:1
index.interval.bytes
leader.replication.throttled.replicas leader副本限流 格式:分区号:副本Leader号 0:0
max.compaction.lag.ms
max.message.bytes 最大的batch的message大小
message.downconversion.enable message是否向下兼容
message.format.version message格式版本
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas 最小的ISR
preallocate
retention.bytes 日志保留大小(通常按照时间限制)
retention.ms 日志保留时间
segment.bytes segment的大小限制
segment.index.bytes
segment.jitter.ms
segment.ms segment的切割时间
unclean.leader.election.enable 是否允许非同步副本选主
key value 示例
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate 针对消费者user进行限流
producer_byte_rate 针对生产者进行限流
request_percentage 请求百分比

副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions

请戳

  1. 【kafka运维】kafka-reassign-partitions.sh分区副本重分配、数据迁移、副本扩缩容 (附教学视频)
  2. 【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,附视频)

Topic的发送kafka-console-producer.sh

1 生产无key消息

1
2
3

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

2 生产有key消息
加上属性--property parse.key=true

1
2
3

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --property parse.key=true

默认消息key与消息value间使用“Tab键”进行分隔,如下, 如果k   v 不全 会抛出异常

生产消息有key,并且用逗号“ ,”隔开,并且消息格式不对也忽略抛出异常

加上属性--property parse.key=true --property key.separator=, --property=ignore.error=true

1
--bootstrap-server xxxx:9090 --topic topic_test --property parse.key=true --property key.separator=, --property=ignore.error=true

3. 配置更多生产者配置

--producer.config config/producer.properties 通过这个配置增加


可选参数

参数 值类型 说明 有效值
--bootstrap-server String 要连接的服务器必需(除非指定–broker-list) 如:host1:prot1,host2:prot2
--topic String (必需)接收消息的主题名称
--batch-size Integer 单个批处理中发送的消息数 200(默认值)
--compression-codec String 压缩编解码器 none、gzip(默认值)snappy、lz4、zstd
--max-block-ms Long 在发送请求期间,生产者将阻止的最长时间 60000(默认值)
--max-memory-bytes Long 生产者用来缓冲等待发送到服务器的总内存 33554432(默认值)
--max-partition-memory-bytes Long 为分区分配的缓冲区大小 16384
--message-send-max-retries Integer 最大的重试发送次数 3
--metadata-expiry-ms Long 强制更新元数据的时间阈值(ms) 300000
--producer-property String 额外的生产者配置属性, 配置Producer.这个优先级高于--producer.config
--producer.config String 生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径
--property String 定义一些消息读取器的属性,消息读取器是 line-reader配置的;默认就是 读取输入框的消息 默认的消息生产器有下面三个属性可以配置
1. parse.key=true/false 是否解析key(消息发送设置key)
2. key.separator=<key.separator> (key和value之间的分割器 默认是\t Tab健分割)
3. ignore.error=true/false 是否忽略异常(读取消息的时候,如果没有读取到key,则忽略key缺失这个异常)

例如生产带有key的消息,key和value用逗号“,”分割并忽略异常 : --property parse.key=true --property ignore.error=true --property key.separator=,
看下图就知道,这三个属性的作用了
--request-required-acks String 生产者请求的确认方式 0、1(默认值)、all
--request-timeout-ms Integer 生产者请求的确认超时时间 1500(默认值)
--retry-backoff-ms Integer 生产者重试前,刷新元数据的等待时间阈值 100(默认值)
--socket-buffer-size Integer TCP接收缓冲大小 102400(默认值)
--timeout Integer 消息排队异步等待处理的时间阈值 1000(默认值)
--sync 同步发送消息 默认 : false
--version 显示 Kafka 版本 不配合其他参数时,显示为本地Kafka版本
--help 打印帮助信息
--line-reader 输出阅读器 default: kafka.tools. ConsoleProducer$LineMessageReader

Topic的消费kafka-console-consumer.sh

1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)
下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费

1
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --from-beginning 

2. 正则表达式匹配topic进行消费--whitelist
消费所有的topic

1
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --whitelist '.*' 

消费所有的topic,并且还从头消费

1
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --whitelist '.*'   --from-beginning 

3.显示key进行消费--property print.key=true

1
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --property print.key=true

4. 指定分区消费--partition 指定起始偏移量消费--offset

1
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset	100

5. 给客户端命名--group

注意给客户端命名之后,如果之前有过消费,那么--from-beginning 就不会再从头消费了

1
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --group test-group

6. 添加客户端属性--consumer-property

这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行

1
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  --consumer-property group.id=test-consumer-group

7. 添加客户端属性--consumer.config

--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config

1
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test   --consumer.config config/consumer.properties

参数 描述 例子
--group 指定消费者所属组的ID
--topic 被消费的topic
--partition 指定分区 ;除非指定–offset,否则从分区结束(latest)开始消费 --partition 0
--offset 执行消费的起始offset位置 ;默认值: latest; /latest /earliest /偏移量 --offset 10
--whitelist 正则表达式匹配topic;--topic就不用指定了; 匹配到的所有topic都会消费; 当然用了这个参数,--partition --offset等就不能使用了
--consumer-property 将用户定义的属性以key=value的形式传递给使用者 --consumer-property group.id=test-consumer-group
--consumer.config 消费者配置属性文件请注意,[consumer-property]优先于此配置 --consumer.config config/consumer.properties
--property 初始化消息格式化程序的属性 print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>
--from-beginning 从存在的最早消息开始,而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了
--max-messages 消费的最大数据量,若不指定,则持续消费下去 --max-messages 100
--skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停
--isolation-level 设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted
--formatter kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter

kafka-leader-election Leader重新选举

1 指定Topic指定分区用重新PREFERRED:优先副本策略 进行Leader重选举

1
2
3

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

2 所有Topic所有分区用重新PREFERRED:优先副本策略 进行Leader重选举

1
2
sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred  --all-topic-partitions

3 设置配置文件批量指定topic和分区进行Leader重选举

先配置leader-election.json文件

1
2
3
4
5
6
7
8
9
10
11
12
13

{
"partitions": [
{
"topic": "test_create_topic4",
"partition": 1
},
{
"topic": "test_create_topic4",
"partition": 2
}
]
}
1
2
3

sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json


相关可选参数

参数 描述 例子
--bootstrap-server 指定kafka服务 指定连接到的kafka服务 –bootstrap-server localhost:9092
--topic 指定Topic,此参数跟--all-topic-partitionspath-to-json-file 三者互斥
--partition 指定分区,跟--topic搭配使用
--election-type 两个选举策略(PREFERRED: 优先副本选举,如果第一个副本不在线的话会失败;UNCLEAN: 策略)
--all-topic-partitions 所有topic所有分区执行Leader重选举; 此参数跟--topicpath-to-json-file 三者互斥
--path-to-json-file 配置文件批量选举,此参数跟--topicall-topic-partitions 三者互斥

持续批量推送消息kafka-verifiable-producer.sh

1. 单次发送100条消息--max-messages 100

一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置

1
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server  localhost:9092 `--max-messages 100`

2. 每秒发送最大吞吐量不超过消息 --throughput 100

推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制

1
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server   localhost:9092  --throughput 100

3. 发送的消息体带前缀--value-prefix

1
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server   localhost:9092  --value-prefix 666

注意 --value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号. 例如: 666.

其他参数:
--producer.config CONFIG_FILE 指定producer的配置文件
--acks ACKS 每次推送消息的ack值,默认是-1

持续批量拉取消息kafka-verifiable-consumer

1. 持续消费

1
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer  --bootstrap-server  localhost:9092   --topic test_create_topic4 

2. 单次最大消费10条消息--max-messages 10

1
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer  --bootstrap-server  localhost:9092  --topic test_create_topic4 --max-messages 10 

相关可选参数

参数 描述 例子
--bootstrap-server 指定kafka服务 指定连接到的kafka服务; –bootstrap-server localhost:9092
--topic 指定消费的topic
--group-id 消费者id;不指定的话每次都是新的组id
group-instance-id 消费组实例ID,唯一值
--max-messages 单次最大消费的消息数量
--enable-autocommit 是否开启offset自动提交;默认为false
--reset-policy 当以前没有消费记录时,选择要拉取offset的策略,可以是earliest, latest,none。默认是earliest
--assignment-strategy consumer分配分区策略,默认是org.apache.kafka.clients.consumer.RangeAssignor
--consumer.config 指定consumer的配置文件

生产者压力测试kafka-producer-perf-test.sh

1. 发送1024条消息--num-records 100并且每条消息大小为1KB--record-size 1024 最大吞吐量每秒10000条--throughput 100

1
sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 1024 --throughput 100000  --producer-props bootstrap.servers=localhost:9092 --record-size 1024

你可以通过LogIKM查看分区是否增加了对应的数据大小
在这里插入图片描述
LogIKM 可以看到发送了1024条消息; 并且总数据量=1M; 1024条*1024byte = 1M;

2. 用指定消息文件--payload-file 发送100条消息最大吞吐量每秒100条--throughput 100

  1. 先配置好消息文件batchmessage.txt

    在这里插入图片描述

  2. 然后执行命令

    发送的消息会从batchmessage.txt里面随机选择; 注意这里我们没有用参数--payload-delimeter指定分隔符,默认分隔符是\n换行;

    1
    bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 1024 --throughput 100  --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt




  1. 验证消息,可以通过 LogIKM 查看发送的消息

    在这里插入图片描述


相关可选参数

参数 描述 例子
--topic 指定消费的topic
--num-records 发送多少条消息
--throughput 每秒消息最大吞吐量
--producer-props 生产者配置, k1=v1 k2=v2 (中间空格) --producer-props bootstrap.servers= localhost:9092 client.id=test_client
--producer.config 生产者配置文件 --producer.config config/producer.propeties
--print-metrics 在test结束的时候打印监控信息,默认false --print-metrics true
--transactional-id 指定事务 ID,测试并发事务的性能时需要,只有在 –transaction-duration-ms > 0 时生效,默认值为 performance-producer-default-transactional-id
--transaction-duration-ms 指定事务持续的最长时间,超过这段时间后就会调用 commitTransaction 来提交事务,只有指定了 > 0 的值才会开启事务,默认值为 0
--record-size 一条消息的大小byte; 和 –payload-file 两个中必须指定一个,但不能同时指定
--payload-file 指定消息的来源文件,只支持 UTF-8 编码的文本文件,文件的消息分隔符通过 --payload-delimeter 指定,默认是用换行\nl来分割的,和 –record-size 两个中必须指定一个,但不能同时指定 ; 如果提供的消息
--payload-delimeter 如果通过 --payload-file 指定了从文件中获取消息内容,那么这个参数的意义是指定文件的消息分隔符,默认值为 \n,即文件的每一行视为一条消息;如果未指定--payload-file则此参数不生效;发送消息的时候是随机送文件里面选择消息发送的;

消费者压力测试kafka-consumer-perf-test.sh

消费100条消息 --messages 100

1
sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server  localhost:9090 --messages 100

相关可选参数

参数 描述 例子
--bootstrap-server
--consumer.config 消费者配置文件
--date-format 结果打印出来的时间格式化 默认:yyyy-MM-dd HH:mm:ss:SSS
--fetch-size 单次请求获取数据的大小 默认1048576
--topic 指定消费的topic
--from-latest
--group 消费组ID
--hide-header 如果设置了,则不打印header信息
--messages 需要消费的数量
--num-fetch-threads feth 数据的线程数(废弃无效) 默认:1
--print-metrics 结束的时候打印监控数据
--show-detailed-stats 如果设置,则按照--report_interval配置的方式报告每个报告间隔的统计信息
--threads 消费线程数;(废弃无效) 默认 10
--reporting-interval 打印进度信息的时间间隔(以毫秒为单位)

删除指定分区的消息kafka-delete-records.sh

删除指定topic的某个分区的消息删除至offset为1024

先配置json文件offset-json-file.json

1
2
3
4
5
{"partitions":
[{"topic": "test1", "partition": 0,
"offset": 1024}],
"version":1
}

在执行命令

sh bin/kafka-delete-records.sh –bootstrap-server 172.23.250.249:9090 –offset-json-file config/offset-json-file.json

验证 通过 LogIKM 查看发送的消息

在这里插入图片描述
从这里可以看出来,配置"offset": 1024 的意思是从最开始的地方删除消息到 1024的offset; 是从最前面开始删除的

查看Broker磁盘信息kafka-log-dirs.sh

查询指定topic磁盘信息 --topic-list topic1,topic2

1
sh bin/kafka-log-dirs.sh  --bootstrap-server xxxx:9090 --describe  --topic-list test2

查询指定Broker磁盘信息--broker-list 0 broker1,broker2

1
sh bin/kafka-log-dirs.sh  --bootstrap-server xxxxx:9090 --describe  --topic-list test2 --broker-list 0

例如我一个3分区3副本的Topic的查出来的信息
logDir Broker中配置的log.dir

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
{
"version": 1,
"brokers": [{
"broker": 0,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 1,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 2,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 3,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
"error": null,
"partitions": []
}]
}]
}

如果你觉得通过命令查询磁盘信息比较麻烦,你也可以通过 LogIKM 查看
在这里插入图片描述

消费者组管理 kafka-consumer-groups.sh

1
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

在这里插入图片描述

先调用MetadataRequest拿到所有在线Broker列表
再给每个Broker发送ListGroupsRequest请求获取 消费者组数据

DescribeGroupsRequest

1. 查看指定消费组详情--group或所有消费组详情--all-groups

查看指定消费组详情--group

1
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090  --describe --group test2_consumer_group

查看所有消费组详情--all-groups

1
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090  --describe --all-groups

查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息
在这里插入图片描述

2. 查询消费者成员信息--members

所有消费组成员信息

1
sh bin/kafka-consumer-groups.sh  --describe  --all-groups --members --bootstrap-server xxx:9090

指定消费组成员信息

1
sh bin/kafka-consumer-groups.sh --describe   --members  --group test2_consumer_group --bootstrap-server xxxx:9090

在这里插入图片描述

3. 查询消费者状态信息--state

所有消费组状态信息

1
sh bin/kafka-consumer-groups.sh  --describe  --all-groups --state --bootstrap-server xxxx:9090

指定消费组状态信息

1
sh bin/kafka-consumer-groups.sh  --describe   --state --group test2_consumer_group --bootstrap-server xxxxx:9090

在这里插入图片描述

DeleteGroupsRequest

删除消费组–delete

删除指定消费组--group

1
sh bin/kafka-consumer-groups.sh  --delete  --group test2_consumer_group   --bootstrap-server xxxx:9090

删除所有消费组--all-groups

1
sh bin/kafka-consumer-groups.sh  --delete  --all-groups   --bootstrap-server xxxx:9090

PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常

1
2
3
Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

能够执行成功的一个前提是 消费组这会是不可用状态;

下面的示例使用的参数是: --dry-run ;这个参数表示预执行,会打印出来将要处理的结果;
等你想真正执行的时候请换成参数--excute ;

下面示例 重置模式都是 --to-earliest 重置到最早的;

请根据需要参考下面 相关重置Offset的模式 换成其他模式;

重置指定消费组的偏移量 --group

重置指定消费组的所有Topic的偏移量--all-topic

1
sh bin/kafka-consumer-groups.sh  --reset-offsets  --to-earliest --group test2_consumer_group   --bootstrap-server xxxx:9090 --dry-run --all-topic

重置指定消费组的指定Topic的偏移量--topic

1
sh bin/kafka-consumer-groups.sh  --reset-offsets  --to-earliest --group test2_consumer_group   --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有消费组的偏移量 --all-group

重置所有消费组的所有Topic的偏移量--all-topic

1
2
sh bin/kafka-consumer-groups.sh  --reset-offsets  --to-earliest --all-group   --bootstrap-server xxxx:9090 --dry-run --all-topic

重置所有消费组中指定Topic的偏移量--topic

1
sh bin/kafka-consumer-groups.sh  --reset-offsets  --to-earliest --all-group   --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets 后面需要接重置的模式

相关重置Offset的模式

参数 描述 例子
--to-earliest : 重置offset到最开始的那条offset(找到还未被删除最早的那个offset)
--to-current: 直接重置offset到当前的offset,也就是LOE
--to-latest 重置到最后一个offset
--to-datetime: 重置到指定时间的offset;格式为:YYYY-MM-DDTHH:mm:SS.sss; --to-datetime "2021-6-26T00:00:00.000"
--to-offset 重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset<--to-offset, 这个时候重置为目标最大offset;2.目标最小offset>--to-offset ,则重置为最小; 3.否则的话才会重置为--to-offset的目标值; 一般不用这个 --to-offset 3465 在这里插入图片描述
--shift-by 按照偏移量增加或者减少多少个offset;正的为往前增加;负的往后退;当然这里也是匹配所有的; --shift-by 100--shift-by -100
--from-file 根据CVS文档来重置; 这里下面单独讲解

--from-file着重讲解一下

上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset;不过--from-file可以让我们更灵活一点;

  1. 先配置cvs文档
    格式为: Topic:分区号: 重置目标偏移量

    1
    2
    3
    test2,0,100
    test2,1,200
    test2,2,300
  2. 执行命令

    sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

能够执行成功的一个前提是 消费组这会是不可用状态;

偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;

1
sh bin/kafka-consumer-groups.sh  --delete-offsets  --group test2_consumer_group2  --bootstrap-server XXXX:9090 --topic test2

相关可选参数

参数 描述 例子
--bootstrap-server 指定连接到的kafka服务; –bootstrap-server localhost:9092
--list 列出所有消费组名称 --list
--describe 查询消费者描述信息 --describe
--group 指定消费组
--all-groups 指定所有消费组
--members 查询消费组的成员信息
--state 查询消费者的状态信息
--offsets 在查询消费组描述信息的时候,这个参数会列出消息的偏移量信息; 默认就会有这个参数的;
dry-run 重置偏移量的时候,使用这个参数可以让你预先看到重置情况,这个时候还没有真正的执行,真正执行换成--excute;默认为dry-run
--excute 真正的执行重置偏移量的操作;
--to-earliest 将offset重置到最早
to-latest 将offset重置到最近

查看日志文件 kafka-dump-log.sh

参数 描述 例子
--deep-iteration
--files <String: file1, file2, ...> 必需; 读取的日志文件 –files 0000009000.log
--key-decoder-class 如果设置,则用于反序列化键。这类应实现kafka.serializer。解码器特性。自定义jar应该是在kafka/libs目录中提供
--max-message-size 最大的数据量,默认:5242880
--offsets-decoder if set, log data will be parsed as offset data from the __consumer_offsets topic.
--print-data-log 打印内容
--transaction-log-decoder if set, log data will be parsed as transaction metadata from the __transaction_state topic
--value-decoder-class [String] if set, used to deserialize the messages. This class should implement kafka. serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer. StringDecoder)
--verify-index-only if set, just verify the index log without printing its content.

查询Log文件

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log
在这里插入图片描述

查询Log文件具体信息 --print-data-log

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log --print-data-log
在这里插入图片描述

查询index文件具体信息

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.index
在这里插入图片描述
配置项为log.index.size.max.bytes; 来控制创建索引的大小;

查询timeindex文件

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.timeindex
在这里插入图片描述

附件

ConfigCommand 的一些可选配置


Topic相关可选配置

key value 示例
cleanup.policy 清理策略
compression.type 压缩类型(通常建议在produce端控制)
delete.retention.ms 压缩日志的保留时间
file.delete.delay.ms topic删除被标记为–delete文件之后延迟多长时间删除正在的Log文件 60000
flush.messages 持久化message限制
flush.ms 持久化频率
follower.replication.throttled.replicas flowwer副本限流 格式:分区号:副本follower号,分区号:副本follower号 0:1,1:1
index.interval.bytes
leader.replication.throttled.replicas leader副本限流 格式:分区号:副本Leader号 0:0
max.compaction.lag.ms
max.message.bytes 最大的batch的message大小
message.downconversion.enable message是否向下兼容
message.format.version message格式版本
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas 最小的ISR
preallocate
retention.bytes 日志保留大小(通常按照时间限制)
retention.ms 日志保留时间
segment.bytes segment的大小限制
segment.index.bytes
segment.jitter.ms
segment.ms segment的切割时间
unclean.leader.election.enable 是否允许非同步副本选主

Broker相关可选配置

key value 示例
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable

Users相关可选配置

key value 示例
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate 针对消费者user进行限流
producer_byte_rate 针对生产者进行限流
request_percentage 请求百分比

clients相关可选配置

key value 示例
consumer_byte_rate
producer_byte_rate
request_percentage

以上大部分运维操作,都可以使用 LogIKM 在平台上可视化操作;


欢迎 「Star」「共建」 滴滴开源 Kafka运维管控平台 更符合国人的操作习惯 ,

更强大的管控能力 ,更高效的问题定位能力更便捷的集群运维能力更专业的资源治理更友好的运维生态

More

Kafka专栏持续更新中…(源码、原理、实战、运维、视频、面试视频)


作者石臻臻,工作8年的互联网老兵,丰富的开发和管理经验,全网「 粉丝数4万 」,

先后从事 「 电商 」「 中间件 」「 大数据」 等工作

现在任职于「 滴滴技术专家 」岗位,从事开源建设工作

目前在维护 个人公众号「 石臻臻的杂货铺 」 ; 关注公众号会有「 日常送书活动 」;

欢迎进「 高质量 」 「 滴滴开源技术答疑群 」 , 群内每周技术专家轮流值班答疑

===============================可帮忙「 内推 」一二线大厂 ===============================