本章我们讲解一下基本的脚本工具使用,按照不同的角度出发
1. broker的管理
基本的启动时利用 kafka-server-start.sh
bin/kafka-server-start.sh -daemon <path>/server.properties
需要注意,不加入-daemon,其是不会进入后天运行的
如果不想加入-daemon,那么可以使用 nohup … &来启动。
对于broker的关闭,则是一个大问题
正确的关闭broker进程,如果不是以daemon这样的后台方式启动的,那么可以直接 CTRL+C即可
如果是后台运行,那么就需要使用 bin/kafka-server-stop.sh,这个脚本会利用grep来进行查找所有的kakfa接口,并进行关闭。
如果需要关闭指定的Kafka,可以自行查找Kafka对应的PID,然后利用kill -s TERM $PID来关闭
暴露broker的JMX端口
可以利用
Export JMX_PORT=9997 bin/kafka-server-start.sh -daemon <path>/server.properties
对于broker的增加
直接为broker新增一个broker.id,然后进行启动,不过缺点是新增的broker不会被分配任何老的topic分区,需要手动进行重分配。
Broker版本的升级
其实升级很简单
首先是确定broker之间的通信版本和消息版本
需要确定broker的server.properties中是否包含key
Inter.broker.protocol.version
Log.message.format.version
然后更新代码,进行broker的重启
下载新的broker,进行依次重启
更新通信版本和消息版本
Inter.broker.protocol.version=0.10.2
Log.message.format.version=0.10.2
这样就升级完成了。
2. topic的管理
对于topic创建的方式,主要有以下四种
执行kafka-topics.sh命令行工具创建
显式发送CreateTopicsRequest来创建Topic
发送MetadataRequest请求,且设置了auto.create.topics.enable为true
通过Zookeeper来向/brokers/topics路径下写入以topic名称命名的子节点
上面四种方法中,官方社区推荐使用前两种方法创建topic,
如果使用前一种方法来创建topic,脚本的可选参数包含下面的列表
比如我们创建一个topic,名称为 test1,配置为6个分区,每个分区三个副本,留存时间为3天,
/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partition 6 –replication-factor 3 –topic test1 –config delete.retention.ms=259200000
或者我们手动指定分区在集群上的分配
/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partition 6 –replication-factor 3 –topic test2
–replica-assignment 0:1,1:2,0:2,1:2
删除topic
删除topic当前有3个方式,
使用kafka-topics脚本
构建DeleteTopicsRequest请求
直接向ZooKeeper的/admin/delete_topics下写入子节点
不过并不推荐使用直接写入ZooKeeper中
而使用其他方式,执行命令一般是立刻返回,这时候并没有直接删除topic数据,而是需要用户多等待一段时间后才能观察到topic数据被删除。
而且删除topic,也需要设置delete.topic.enable参数为true.
获取topic的列表
可以使用Kafka的kafka-topics脚本来获取集群的topic列表。
bin/kafka-topics.sh –zookeeper localhost:2181 –list
查询topic的详情
利用kafka-topics脚本来看topic的具体详情
bin/kafka-topics.sh –zookeeper localhost:2181 –topic test1 –describe
输出了内部包含了哪些分区,哪些同步的副本,哪些ISR
修改topic本身的信息,比如增加分区,使用bin/kafka-topics.sh
bin/kafka-topics.sh –alter –zookeeper localhost:2181 –partition 10 –topic test1
这样就会进行分区的增加,而且现在Kafka并不支持减少分区数,如果设置一个小于当前分区数的数量,会被抛出异常。
而如果希望修改某些topic级别的参数呢,可以使用kafka-configs脚本来设置,
其可以添加,修改,删除不同实体的参数
在kafak-configs脚本中,包含了四种实体,分别是topic user clients broker
那么我们如何为已经有的topic添加一个topic的参数,假设要为上面的test2来添加配置
cleanup.policy
bin/kafka-config.sh –zookeeper localhost:2181 -alter –entity-type topics –entity-name test-topics2 –add-config cleanup.policy=compact
而且由于topics是kafka中的虚拟概念,所以对于其的设置更新,是不需要重启broker的。
查看topic的参数,则可以使用kafak-topics 或者 kafak-configs脚本
如果是使用kafka-topics脚本来查看配置的话
基本使用如下的命令
bin/kafka-topics.sh -describe –zookeeper localhost:2181 –topic test
使用kafka-configs脚本来看
bin/kafka-configs.sh –describe –zookeeper localhost:2181 –-entity-type topics –entity-name test
删除配置则是使用kafka-configs脚本进行操作的
bin/kafka-configs.sh –zookeeper localhost:2181 –alter –entity-type topics –entity-name test –delete-config preallocate
consumer管理相关脚本
查看消费者组的脚本为kafka-consumer-groups.sh
这个脚本的主要功能参数为
上面bootstrap-server和zookeeper 两者是不可同时指定的,分别对应着新旧的consumer版本
这里我们测试list功能,查看所有的消费者组。
bin/kafka-consumer-groups.sh -–bootstrap-server localhost:9092 –-list
然后分别查看不同消费组的详情
bin/kafka-consumer-groups.sh -–bootstrap-server localhost:9092 –describe -group test1
其中展示的内容有,TOPIC-消费者组消费的topic
PARTITION 消费者组消费了那个分区
CURRENT-OFFSET 消费者组最新消费的位移值
LOG-END-OFFSET topic所有分区当前日志终端位移值
LAG 消费滞后进度,就是LOG-END-OFFSET 减去 CURRENT-OFFSET的值
一般是大于等于0的正数,如果是负数,说明消息还没被消费就被consumer跳过了。
CONSUMER-ID consumer的id,一般为自动生成的,如果没有这个值,说明consumer还没在运行中
HOST: consumer所在的主机信息,如果没有值,说明还没在运行
CLIENT-ID 自动生成的一个客户端ID
然后是重设消费者组的位移
利用脚本来为consumer group重新设置位移,但是要求consumer group不能处于运行状态
相关脚本是kafka-consumer-groups
常见参数可以分为三类
作用域相关,明确修改消费者组中哪些topic
–all-topics 所有topics
–topic t1,–topic t2 若干个topic
–topic t1:0,1,2 指定分区
重设规则
–to-earliest 设置到最早位移处
–to-latest设置到最晚位移处
–to-current 设置到当前位移处
–to-offset<offset> 设置到指定位移处
–shift-by N位移调整到当前位移除+N的位置
–to-datetime <datetime>设置到给定时间的最早位移处
–by-duration 设定到距离当前时间制定间隔的位移处
–from-file 从csv中读取调整策略
最后可以确定是否执行
–execute 执行,不加就是打印调整方案
–export 保存为CSV并输出到控制台
比如下面的命令
bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –group test1 –reset-offsets –all-topics –to-earliest –execute
删除消费者组,同样也是使用kafka-consumer-groups脚本,来删除处于inactive版本的消费者组
bin/kafka-consumer-groups.sh –zookeeper localhost:2181 –delete –group test1
但实际使用中,因为有offset.retention.minutes的参数,所以会自动移除inactive的位移信息,毕竟位移信息保存在内部topic中
由于一个topic下,往往具有多个分区,每个分区还有同步的概念,同步就带来了leader和follower的副本差异,为了避免Kafka中小部分的broker承载了大量的分区leader副本,所以引入了首选副本preferred replica.
一般来说,第一次分配副本的时候,当选的leader副本就是preferred replica,
kafka preferred replica的变更脚本使用的士 kafka-preferred-replica-election脚本
这个脚本需要一个json文件,来确定如何调整分区,比如下面的json文件
之后使用如下命令即可
bin/kafka-preferred-replica-election.sh –zookeeper localhost:2181 –path-to-json-file <path>/preferred-leader-plan.json
之后的preferred replica就会被调整了。当然除了手动调整外,还有kafka提供了broker端参数来帮助用户自动执行preferred leader,利用的参数是 leader.imbalance.per.broker.percentage
根据百分比来进行perferred leader选举。超过10%,就进行preferred leader选举。
分区的重分配,是为了在新的broker加入,将topic重新分配更加均匀的。
我们来演示如何进行重分配,
bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file test.json –broker-list “5,6” –generate
这样就会给与一个新的分配方案,但是并没有执行,而是希望保存为一个新的json文件,来进行后续真正的重分配操作。
说实话, 一定要谨慎的发起分区重分配操作,因为不同broker之间数据迁移会占用很大的broker机器带宽.
对于client端的脚本有kafka-console-producer脚本
kafka-console-producer脚本是模拟producer进行消息的生产,支持的参数有
如果我们希望发送消息,可以指定producer的ack为all,使用LZ4消息压缩,失败重试次数为10次,linger.ms设置为3秒,命令为
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test –compression-codec lz4 –request-required-acks all –timeout 3000 –message-send-max-retries 10
需要注意,使用脚本就不用指定自定义序列化类,不需要手动指定serializer.
同样kafka-console-consumer脚本是用来模拟消费者的消费.
kafka-console-consumer脚本的使用基本如下.
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
这里我们不需要制定group id,会自动生成一个group ID.
kafka-run-class是很多kafka脚本实现的基石.其中包含了一些很有用的脚本工具.
比如查看元数据,而元数据信息是包含消息位移,创建时间戳,压缩类型,字节数等
kafka提供了这样的功能,为kafka.tools.DumpLogSegments
基本的使用如下
bin/kafka-run-class.sh kafak.tools.DumpLogSegments –files ../datalogs/kafka_1/t1-0/0000000000000000000.log
其中会显示日志中位移信息,物理文件位置,消息长度,压缩类型 ,CRC码等元数据信息.
获取topic当前消息数
用户希望能够实时的了解topic生产了多少条消息,Kafka提供了GetOffsetShell来统计消息总数.
基本的使用如下
bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list localhost:9092 –topic test –time -1
其中time -1会表示获取topic的所有分区最大位移, -2表示最早位移.
获取consumer_offset
即使consumer的位移保存在Kafka的内部topic中,也是可以通过 kafka-simple-consumer-shell.sh来查看
不过因为__consumer_offsets的topic中,位移会被在某一个子分区上,首先确定了子分区,然后进行查询topic位移信息.
bin/kafka-simple-consumer-shell.sh –topic __consumer_offset –partition 12 –broker-list localhost 9092 –formatter “kafka.coordinator.GroupMetadataManager $OffsetMessageFormatter”
得到的是结果如下
每一行展示了某一个分区的当前的已提交位移,提交时间等信息.
最后我们给出上面脚本和对应功能的一个表格:
脚本 | 功能 |
Kafka-server-start.sh | 启动broker |
Kafka-server-start.sh | 设置JMX端口 |
Kafka-server-stop.sh | 关闭broker |
Kafka-topics.sh | 创建topic |
Kafka-topics.sh | 修改topic参数 |
Kafka-topics.sh | 删除topic |
Kafka-topics.sh | 查询topic列表 |
Kafka-topics.sh | 查询topic详情 |
Kafka-configs.sh | 查看 topic user clients,broker等配置 |
Kafka-configs.sh | 修改 topic user clients,broker等配置 |
Kafka-configs.sh | 删除 topic user clients,broker等配置 |
Kafka-consumer-groups.sh | 查询consumer消费者组相关状态 |
Kafka-consumer-groups.sh | 重设消费者组位移 |
Kafka-consumer-groups.sh | 删除消费者组 |
Kafka-preferred-replica-election.sh | 进行preferred leader选举 |
Kafka-reassign-partitions.sh | 重分区 |
Kafka-console-producer | 模拟producer |
Kafka-console-consumer | 模拟consumer |
Kakfa-run-class | 查看消息元数据 |
Kakfa-run-class | 查看topic当前消息数 |
Kakfa-run-class | 查询__consumer_offsets |