我们来说一下在Kafka早期版本到现在版本的生产者和消费者

原本Kafka的Producer和Consumer是由Scala实现的,后来利用java版本进行了迭代和替换

基本在kafka 1.0.0之后都替换为了新版本的。

新版本的producer不再依赖于Zookeeper了,基本的逻辑工作图如下

图片

新版本会将消息封装为一个ProducerRecord对象,并利用KafkaProduer.send方法进行发送,然后交给Proudcer对象,进行序列化,并根据元数据信息,写入消息缓冲池,最后交给一个Send线程来发送到broker上。

这样,就将用户的主线程和Sender线程解耦,逻辑上更容易把控。

其次是异步发送消息,提供回调机制来判断发送是否成功

批处理的方式发送,提高了吞吐量

优化了分区策略,在老版本中,对于没有指定key的消息来说,会在一段时间后发送到固定分区,新版本采用轮询,消息发送更加均匀化。

关于新版本的API,如下图所示

图片

重点在于其中的send:消息发送的主方法,close:关闭producer,正确的关闭对于程序来说至关重要

Metrics:获取producer的实时监控数据,比如速率。

而对于老版本的producer而言,其消息发送是同步,这就导致必须要一条条的发送,因此老版本的producer的吞吐量很慢。整体的工作流程如下图。

图片

提供的API也很有限,基本如下图

图片

整体只提供了send和close两个方法,设计是非常简陋的。

其次是新版本的consumer和老版本的consumer,新版本的consumer也脱离了Zookeeper的依赖,老版本中,消费位移都保存在Zookeeper中,而在新版本中,位移的管理和保存不再依赖Zookeeper了,自然瓶颈就消失了。

新版本Consumer消费主要依靠的是Kafka的poll方法,这一点类似Linux的epoll,通过一个线程来管理多个链接broker的socket。避免了一个broker一个线程的设计

其次是位移不在保存在Zookeeper中,而是单独保存在一个内部topic中,避免了Zookeeper的频繁读写,也是利用了Kafka自身的高可用机制

最后是改进了消费者组的概念,增加了一个集中式协调者 coordinator的角色,所有成员的管理交给coordinator负责。

提供的API也更加丰富

图片

其中的重点是poll读取消息的核心方法。

还有subscribe 订阅哪些topic的分区

commitSync和commitAsync 手动提交位移

seek 设置位移方法

而对于老版本的consumer而言,其中让人诟病的是分为了high-level consumer 和 low-level consumer

其中low-level consumer是指的单个consumer,没有什么消费者概念,也就是单个消费者,不产生任何的关联。

其api设计的如下

图片

其中还提供了send方法,这个send不是发送消息,而是发送一些具体的请求。

而high-level consumer则是有着只能从上次保存的位移处开始读取消息的问题,无法实现高度定制化的消费策略,所以也是死板的。

基本的API如下

图片

从上面的角度来看,建议大家在任何情况下都是用新版本的kafka。

发表评论

邮箱地址不会被公开。 必填项已用*标注