由于一般情况下,我们往往利用Kafka这类中间件进行数据的清洗

将原始数据加工为更为上层,更加贴合业务的数据

图片

故需要一个Agent既当consumer ,又当Producer

这就要求,作为consumer拉取到数据后,需要开启一个事务,如果处理完成,那么就提交偏移量到server,表明自己作为consumer消费成功,然后将处理后的数据,作为生产者将数据发送出去,提交事务

需要注意,一般我们设置consumer的时候,一般都是设置enable.auto.commit 为true

这种情况表示自动提交,但对于手动提交,则是设置下面的配置

props.put(“enable.auto.commit”, “false”);

// 消费方式:从最新的位置读,还是从最早的位置读取数据

props.put(“auto.offset.reset”, “earliest”);

然后在业务代码相关的模块中

producer.initTransactions();

while (true) {

producer.beginTransaction();

try {

ConsumerRecords<String, String> records = consumer.poll(1000);

//System.out.println(“cnt = ” + records.count());

Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>();

for (ConsumerRecord<String, String> record : records) {

map.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));

}

// 将生产者和消费者融合在一起

producer.send(new ProducerRecord<String, String>(“itdachang-2”, “test”));

producer.sendOffsetsToTransaction(map, “itdachang”);

producer.commitTransaction();

} catch ( Exception e ) {

producer.abortTransaction();

} finally {

}

}

首先创建了一个事务管理器

然后在while中,producer开启了一个事务

Producer.beginTransaction()

之后分别拉取消息和处理消息,之后,根据是否成功,确定是否同步消息,以及提交事务

发表评论

邮箱地址不会被公开。 必填项已用*标注