本章我们讲解一下基本的脚本工具使用,按照不同的角度出发

1. broker的管理

基本的启动时利用 kafka-server-start.sh

bin/kafka-server-start.sh -daemon <path>/server.properties

需要注意,不加入-daemon,其是不会进入后天运行的

如果不想加入-daemon,那么可以使用 nohup … &来启动。

对于broker的关闭,则是一个大问题

正确的关闭broker进程,如果不是以daemon这样的后台方式启动的,那么可以直接 CTRL+C即可

如果是后台运行,那么就需要使用 bin/kafka-server-stop.sh,这个脚本会利用grep来进行查找所有的kakfa接口,并进行关闭。

如果需要关闭指定的Kafka,可以自行查找Kafka对应的PID,然后利用kill -s TERM $PID来关闭

暴露broker的JMX端口

可以利用

Export JMX_PORT=9997 bin/kafka-server-start.sh -daemon <path>/server.properties

对于broker的增加

直接为broker新增一个broker.id,然后进行启动,不过缺点是新增的broker不会被分配任何老的topic分区,需要手动进行重分配。

Broker版本的升级

其实升级很简单

首先是确定broker之间的通信版本和消息版本

需要确定broker的server.properties中是否包含key

Inter.broker.protocol.version

Log.message.format.version

然后更新代码,进行broker的重启

下载新的broker,进行依次重启

更新通信版本和消息版本

Inter.broker.protocol.version=0.10.2

Log.message.format.version=0.10.2

这样就升级完成了。

2. topic的管理

对于topic创建的方式,主要有以下四种

执行kafka-topics.sh命令行工具创建

显式发送CreateTopicsRequest来创建Topic

发送MetadataRequest请求,且设置了auto.create.topics.enable为true

通过Zookeeper来向/brokers/topics路径下写入以topic名称命名的子节点

上面四种方法中,官方社区推荐使用前两种方法创建topic,

如果使用前一种方法来创建topic,脚本的可选参数包含下面的列表

图片

比如我们创建一个topic,名称为 test1,配置为6个分区,每个分区三个副本,留存时间为3天,

/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partition 6 –replication-factor 3 –topic test1 –config delete.retention.ms=259200000

或者我们手动指定分区在集群上的分配

/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partition 6 –replication-factor 3 –topic test2

–replica-assignment 0:1,1:2,0:2,1:2

删除topic

删除topic当前有3个方式,

使用kafka-topics脚本

构建DeleteTopicsRequest请求

直接向ZooKeeper的/admin/delete_topics下写入子节点

不过并不推荐使用直接写入ZooKeeper中

而使用其他方式,执行命令一般是立刻返回,这时候并没有直接删除topic数据,而是需要用户多等待一段时间后才能观察到topic数据被删除。

而且删除topic,也需要设置delete.topic.enable参数为true.

获取topic的列表

可以使用Kafka的kafka-topics脚本来获取集群的topic列表。

bin/kafka-topics.sh –zookeeper localhost:2181 –list

查询topic的详情

利用kafka-topics脚本来看topic的具体详情

bin/kafka-topics.sh –zookeeper localhost:2181 –topic test1 –describe

输出了内部包含了哪些分区,哪些同步的副本,哪些ISR

修改topic本身的信息,比如增加分区,使用bin/kafka-topics.sh

bin/kafka-topics.sh –alter –zookeeper localhost:2181 –partition 10 –topic test1

这样就会进行分区的增加,而且现在Kafka并不支持减少分区数,如果设置一个小于当前分区数的数量,会被抛出异常。

而如果希望修改某些topic级别的参数呢,可以使用kafka-configs脚本来设置,

其可以添加,修改,删除不同实体的参数

在kafak-configs脚本中,包含了四种实体,分别是topic user clients broker

那么我们如何为已经有的topic添加一个topic的参数,假设要为上面的test2来添加配置

cleanup.policy

bin/kafka-config.sh –zookeeper localhost:2181 -alter –entity-type topics  –entity-name test-topics2 –add-config cleanup.policy=compact

而且由于topics是kafka中的虚拟概念,所以对于其的设置更新,是不需要重启broker的。

查看topic的参数,则可以使用kafak-topics 或者 kafak-configs脚本

如果是使用kafka-topics脚本来查看配置的话

基本使用如下的命令

bin/kafka-topics.sh -describe –zookeeper localhost:2181 –topic test

使用kafka-configs脚本来看

bin/kafka-configs.sh –describe –zookeeper localhost:2181 –-entity-type topics –entity-name test

删除配置则是使用kafka-configs脚本进行操作的

bin/kafka-configs.sh –zookeeper localhost:2181 –alter –entity-type topics –entity-name test –delete-config preallocate

图片

consumer管理相关脚本

查看消费者组的脚本为kafka-consumer-groups.sh

这个脚本的主要功能参数为

图片

上面bootstrap-server和zookeeper 两者是不可同时指定的,分别对应着新旧的consumer版本

这里我们测试list功能,查看所有的消费者组。

bin/kafka-consumer-groups.sh -–bootstrap-server localhost:9092 –-list

然后分别查看不同消费组的详情

bin/kafka-consumer-groups.sh -–bootstrap-server localhost:9092 –describe -group test1

其中展示的内容有,TOPIC-消费者组消费的topic

PARTITION 消费者组消费了那个分区

CURRENT-OFFSET 消费者组最新消费的位移值

LOG-END-OFFSET topic所有分区当前日志终端位移值

LAG 消费滞后进度,就是LOG-END-OFFSET 减去 CURRENT-OFFSET的值

一般是大于等于0的正数,如果是负数,说明消息还没被消费就被consumer跳过了。

CONSUMER-ID consumer的id,一般为自动生成的,如果没有这个值,说明consumer还没在运行中

HOST: consumer所在的主机信息,如果没有值,说明还没在运行

CLIENT-ID 自动生成的一个客户端ID

然后是重设消费者组的位移

利用脚本来为consumer group重新设置位移,但是要求consumer group不能处于运行状态

相关脚本是kafka-consumer-groups

常见参数可以分为三类

作用域相关,明确修改消费者组中哪些topic

–all-topics 所有topics

–topic t1,–topic t2 若干个topic

–topic t1:0,1,2 指定分区

重设规则

–to-earliest 设置到最早位移处

–to-latest设置到最晚位移处

–to-current 设置到当前位移处

–to-offset<offset> 设置到指定位移处

–shift-by N位移调整到当前位移除+N的位置

–to-datetime <datetime>设置到给定时间的最早位移处

–by-duration 设定到距离当前时间制定间隔的位移处

–from-file 从csv中读取调整策略

最后可以确定是否执行

–execute 执行,不加就是打印调整方案

–export 保存为CSV并输出到控制台

比如下面的命令

bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –group test1 –reset-offsets –all-topics –to-earliest –execute

删除消费者组,同样也是使用kafka-consumer-groups脚本,来删除处于inactive版本的消费者组

bin/kafka-consumer-groups.sh –zookeeper localhost:2181 –delete –group test1

但实际使用中,因为有offset.retention.minutes的参数,所以会自动移除inactive的位移信息,毕竟位移信息保存在内部topic中

由于一个topic下,往往具有多个分区,每个分区还有同步的概念,同步就带来了leader和follower的副本差异,为了避免Kafka中小部分的broker承载了大量的分区leader副本,所以引入了首选副本preferred replica.

一般来说,第一次分配副本的时候,当选的leader副本就是preferred replica,

kafka preferred replica的变更脚本使用的士 kafka-preferred-replica-election脚本

这个脚本需要一个json文件,来确定如何调整分区,比如下面的json文件

图片

之后使用如下命令即可

bin/kafka-preferred-replica-election.sh –zookeeper localhost:2181 –path-to-json-file <path>/preferred-leader-plan.json

之后的preferred replica就会被调整了。当然除了手动调整外,还有kafka提供了broker端参数来帮助用户自动执行preferred leader,利用的参数是 leader.imbalance.per.broker.percentage

根据百分比来进行perferred leader选举。超过10%,就进行preferred leader选举。

分区的重分配,是为了在新的broker加入,将topic重新分配更加均匀的。

我们来演示如何进行重分配,

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file test.json –broker-list “5,6” –generate

这样就会给与一个新的分配方案,但是并没有执行,而是希望保存为一个新的json文件,来进行后续真正的重分配操作。

图片

说实话, 一定要谨慎的发起分区重分配操作,因为不同broker之间数据迁移会占用很大的broker机器带宽.

对于client端的脚本有kafka-console-producer脚本

kafka-console-producer脚本是模拟producer进行消息的生产,支持的参数有

图片

如果我们希望发送消息,可以指定producer的ack为all,使用LZ4消息压缩,失败重试次数为10次,linger.ms设置为3秒,命令为

bin/kafka-console-producer.sh –broker-list  localhost:9092 –topic test –compression-codec lz4 –request-required-acks all –timeout 3000 –message-send-max-retries 10

需要注意,使用脚本就不用指定自定义序列化类,不需要手动指定serializer.

同样kafka-console-consumer脚本是用来模拟消费者的消费.

图片

kafka-console-consumer脚本的使用基本如下.

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

这里我们不需要制定group id,会自动生成一个group ID.

kafka-run-class是很多kafka脚本实现的基石.其中包含了一些很有用的脚本工具.

比如查看元数据,而元数据信息是包含消息位移,创建时间戳,压缩类型,字节数等

kafka提供了这样的功能,为kafka.tools.DumpLogSegments

基本的使用如下

bin/kafka-run-class.sh kafak.tools.DumpLogSegments –files ../datalogs/kafka_1/t1-0/0000000000000000000.log

其中会显示日志中位移信息,物理文件位置,消息长度,压缩类型 ,CRC码等元数据信息.

获取topic当前消息数

用户希望能够实时的了解topic生产了多少条消息,Kafka提供了GetOffsetShell来统计消息总数.

基本的使用如下

bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list localhost:9092 –topic test –time -1

其中time -1会表示获取topic的所有分区最大位移, -2表示最早位移.

获取consumer_offset

即使consumer的位移保存在Kafka的内部topic中,也是可以通过 kafka-simple-consumer-shell.sh来查看

不过因为__consumer_offsets的topic中,位移会被在某一个子分区上,首先确定了子分区,然后进行查询topic位移信息.

bin/kafka-simple-consumer-shell.sh –topic __consumer_offset –partition 12 –broker-list localhost 9092 –formatter “kafka.coordinator.GroupMetadataManager $OffsetMessageFormatter”

得到的是结果如下

图片

每一行展示了某一个分区的当前的已提交位移,提交时间等信息.

最后我们给出上面脚本和对应功能的一个表格:

脚本 功能
Kafka-server-start.sh 启动broker
Kafka-server-start.sh 设置JMX端口
Kafka-server-stop.sh 关闭broker
Kafka-topics.sh 创建topic
Kafka-topics.sh 修改topic参数
Kafka-topics.sh 删除topic
Kafka-topics.sh 查询topic列表
Kafka-topics.sh 查询topic详情
Kafka-configs.sh 查看 topic user clients,broker等配置
Kafka-configs.sh 修改 topic user clients,broker等配置
Kafka-configs.sh 删除 topic user clients,broker等配置
Kafka-consumer-groups.sh 查询consumer消费者组相关状态
Kafka-consumer-groups.sh 重设消费者组位移
Kafka-consumer-groups.sh 删除消费者组
Kafka-preferred-replica-election.sh 进行preferred leader选举
Kafka-reassign-partitions.sh 重分区
Kafka-console-producer 模拟producer
Kafka-console-consumer 模拟consumer
Kakfa-run-class 查看消息元数据
Kakfa-run-class 查看topic当前消息数
Kakfa-run-class 查询__consumer_offsets

发表评论

邮箱地址不会被公开。 必填项已用*标注