1.对于Producer,我们需要明确消息会被指定到topic的哪个分区中,Producer提供了默认的分区器,其会通过hash算法将key映射到相同的partition,如果没有指定key,那么就会利用轮询的方式来确保topic在所有的分区上均匀的分配.

但是我们可以自定义分区机制,通过实现一个自定义的分区器即可.

创建一个自定义的分区器,只需要实现对应的Partitioner即可,

这个接口的入参具有topic,key,keyBytes,value,valueBytes,cluster

我们接下来创建一个具有特定规则的分区器.

我们假设有些key会携带字符串audit,如果遇到这些消息,就将他们发送到topic的最后一个分区,对于其他的消息,则还是使用随机的方式来发送到不同的分区上.

基本代码如下

图片

图片

其中我们获取了topic对应了Cluster的信息,然后根据key中的包含,来返回对应的partition数字.

那么我们创建了一个自定义的分区策略,对应的使用方式如下

props.put(“partitioner.class”,”com.xxx.yyy.producer.MyPartitioner”)

这样我们就自定义了一个消息分区策略.

2.消息序列化

则是kafka为了实现将消息以字节的方式进行发送,而提供的功能,分为了key序列化器和value序列化器.而对应的consumer中包含key解码器和value解码器.

图片

而Kafka提供了十几种序列器,常见的如下

ByteArraySerializer,什么都不做

ByteBufferSerializer,序列化ByteBuffer

BytesSerializer,序列化Kafka自定义的Bytes

StringSerializer 序列化String类型

使用也很简单,只需要在配置中指定就可以了.

如果需要 自定义实现一个序列器,只需要编写一个自定义的Serializer

这需要实现Serializer接口.

其中的入参无非就是 topic,data, 并要求返回一个byte数组.

3.Producer拦截器

用于在消息发送前对整个消息进行一个定制化处理,并且可以指定多个intercetpor形成一个拦截器链.对应的实现接口是ProducerIntercetpor

其中方法有

onSend 在计算分区调用这个方法,用于修改消息,最好不要修改消息所属的topic和分区

onAcknowledgement 消息被应答之前或者消息发送失败时候调用,因为运行在producerIO发送线程,不要在其中放入很重的逻辑,不然拖慢producer消息发送效率.

close,执行一些资源清理工作.

我们实现一个拦截器

图片

然后在interceptor之后,在producer主程序中指定.

图片

图片

从而进行拦截器链的使用.

接下来我们说下一个如何配置一个不会丢失数据的Kafka集群.

这需要我们分别配置broker端和producer端.

我们分别说两端的配置,首先是producer端

acks = all 需要等待所有follower都响应了消息之后才认为提交成功

retries = 一个较大值,给与一个较大值,来进行多次重试

max.in.flight.requests.per.connection = 1

限制一个broker上能够发送的未响应的数量,甚至为1,保证在某个broker发送响应后无法再给broker发送producer请求.

确定使用带有回调的send

如果失败之后在callback中立刻关闭producer

对于broker端配置

首先是关闭非ISR中的副本来选举为leader.

relication.factor 设置多份,来保存分区的消息

min.insync.replicas 大于1 ,设置大于1是提高可用性

最后设置replication.factor > min.insync.replicas

因为两者相等,那么只要有一个副本挂掉,那么分区就无法正常工作,为了保证可用性,所以建议如上设置.

对于消息压缩

则是一种通过提高CPU使用率来降低磁盘使用率和带宽使用率,

而且在broker端,并不会对消息进行解压缩,那么就是producer压缩,broker端保持,consumer端解压缩.

对于不同的压缩算法的选择建议是查看Kafka的官网,来进行压缩算法的选择.

最后对于实际项目中,Producer的使用,基本分为了多线程单实例使用,多线程多实例使用.

对于单实例,顾名思义,就是一个Producer订阅多个topic,然后交给多个线程来使用.

多实例则更为简单,就是每个线程构建自己的producer实例,来保证可用.

而对两种的对比

图片

基本上,如果分区数量不多,那么推荐使用第一种

如果拥有很多的分区数量,推荐使用二种方式.

根据上面所讲,基本可以阐述Producer在项目中的基本使用.

发表评论

邮箱地址不会被公开。 必填项已用*标注