这次我们讲解一下Kafka的精准一次处理语义

首先我们确定常见的三种交付语义分别是

最多一次 at most once 消息可能丢失,最多只会被处理一次

最少一次 at least once 消息不会丢失,可能被处理多次

精准一次 exactly once 消息会被处理,且只会被处理一次。

当然,这三者的实现,需要大家根据实际的业务需求来确定,对于大量的日志数据,完全是没有必要使用exactly once的。

那么对于这三个交付语义,我们从consumer和producer两个角度来说,

对于at least once,producer发送消息后,假设网络出现故障没有响应,生产者很有可能再次发送消息,导致一条消息被发送多次,这一点,producer是天然的at least once

对于consumer端,不同语义的实现是根据consumer的提交时机确定的,如果consumer在消息处理完成才提交,那么就是at least once,如果是在处理之前就提交,那么就是at most once。

而精准一次,则需要依赖于新版本引入的事务。

而幂等性则是实现EOS的一个利器,Kafka为了实现幂等性,则是采用和TCP类似的工作方式,发送到broker端的每批消息都会赋予一个序列号,方便消息去重,这个序列号会被kafka保存在日志中,保证高可用,

其次是Kafka为每个producer实例分配了一个producer id,分配了一个PID,从而形成了一个Map

Map的Key是(PID,分区号),value为序列号,那么如果新来了一条数据,如果消息的序列号小于或者等于序列号,那么就拒接这次写入操作。

如果出现了重试操作,也会保证日志只记录一次。这就是producer端的消息去重。

对于整体的事务

是引入了一个唯一的id来表示事务,这个id是事务id,在所有的会话都是唯一的。

提供了TransactionaId,Kafka就可以确保

这个TransactionaId是由用户设置的,和PID不同的是,PID是Kafka内部分配的

故Producer不提交的事务,Consumer不会消费到

但是consumer角度来说,其可能遇到log segment被删除了,

Consumer可以使用seek方法来定位事务中的任意消息的情况。

而且Kafka引入了一类特殊消息,即控制消息control message,事物控制消息和普通Kafka消息一样,都是在消息属性中标记是不是control message.内部分为了两个类型,COMMIT,ABORT,表示事务提交和事物终止。方便consumer识别事物边界,读取下面所有的消息

图片

并在Kafka内部引入了新组件和新的内部topic

Transaction Coordinator

负责保存producer的事务状态,内部引入了一个新的topic,类似__consumer_offsets topic

这个组件来存储事务的最新状态,事物的状态和OnGoing,Prepare commit,Completed

而且每个transactional.id通过hash来对应的transaction log的一个分区,从而确定对应的broker.

Kafka提供了5个事务相关方法

initTransactions()**方法用来初始化事务,这个方法能够执行的前提是配置了transactionalId,如果没有则会报出IllegalStateException:

beginTransaction()**方法用来开启事务;

sendOffsetsToTransaction()**方法为消费者提供在事务内的位移提交的操作;

commitTransaction()**方法用来提交事务;

abortTransaction()**方法用来中止事务,类似于事务回滚。

而消费端的配置,可以配置isolation.level级别,分别为read_uncommitted和read_committed,分别来看到未提交的消息和已经提交的消息。这一点也是根据上面的控制消息来实现的。根据上面的消息来判断事务是否提交还是中止,从而将对应的消息返回consumer.

基本的api顺序调用为

Producer.initTransactions();

Try{

Producer.beginTransaction();

Producer.send(record);

Producer.send(record1);

Producer.commitTransaction();

}catch (KafkaException e){

Producer.abortTransaction();

}

首先是在initTransactions的时候,会将自己注册到transaction coordinator。然后这时候会关闭所有相同transaction id但是还在pending的事务。

其次是发送消息,这时候会先发送消息给transactional coordinator,保存诸如transaction id和partition的关系,然后producer 按照正常流程发送消息到目标 topic。

最后是提交或者回滚消息。这里采用了二阶段提交,第一阶段是“prepare_commit”,先记录进transaction log

然后写入控制消息到目标topic的目标partition,分别表示事物已经提交或者已经终止。

之后写入transaction log更新事务状态为 “commited” 或 “abort”。

最后我们给一个事物使用的基本代码:

public static void main(String[] args) {

KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(new Properties());

consumer.subscribe(Arrays.asList(“topic_source”));

KafkaProducer<String,String> producer = new KafkaProducer<String, String>(new Properties());

//初始化事务

producer.initTransactions();

while (true) {

ConsumerRecords<String, String> message = consumer.poll(Duration.ofMillis(1000));

if (!message.isEmpty()) {

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

//开启事务

producer.beginTransaction();

//遍历消息所在的分区

try{

for (TopicPartition partition : message.partitions()) {

//获取指定分区的消息信息

List<ConsumerRecord<String, String>> records = message.records(partition);

//遍历每一个分区拉去的消息

for (ConsumerRecord<String, String> record : records) {

//这里可以做,一些数据转换逻辑

//组装发送消息

ProducerRecord<String,String> producerRecord = new ProducerRecord<>(“topic-sink”,record.key()

,record.value());

//消费->生产

producer.send(producerRecord);

}

//获取提交的offset值

long lastConsumerOffset = records.get(records.size() – 1).offset();

offsets.put(partition,new OffsetAndMetadata(lastConsumerOffset));

}

//提交offset记录给事务

producer.sendOffsetsToTransaction(offsets,”group_id”);

producer.commitTransaction();

}catch (Exception e){

//log

//出现异常,终止事务

producer.abortTransaction();

}

}

}

}

最后要使用事务,需要的配置为:

producer 配置项更改:

enable.idempotence = true

acks = “all”

retries > 1 (preferably MAX_INT)

transactional.id = ‘some unique id’

consumer 配置项更改:

根据需要配置 isolation.level 为 “read_committed”, 或 “read_uncommitted”;

发表评论

邮箱地址不会被公开。 必填项已用*标注