对于消息队列的消息可靠性保证,我们需要分阶段的去看这个问题

分为生产阶段 存储阶段 消费阶段

在说明这三个阶段消息丢失的解决方案之前,我们说一下如何检测消息丢失

对于消息队列,我们不能是消息丢了还不知道,如果是具有分布式链路追逐系统的公司,还是比较简单的

或者我们可以利用消息队列的有序性来验证消息是否丢失,我们在Producer端给每个消息加上一个连续的序号,然后Cosumer端进行检测

对于分布式的系统中,例如Kafka RocketMQ这种消息队列,需要保证分区,从而保证消息的有序性

还有就是Producer如果是多实例的话,需要在每个Producer生成各自的消息序号,加上Producer的标识

然后是这三个阶段的消息可靠性保证

图片

1.生产阶段,从Producer创建出来,经过网络传输到Broker端

这一阶段,我们需要确保Broker给我返回的ACK是否到达以及是否正确

也就是在编写相关的消息代码的时候,正确的返回返回值或者捕获异常,保证消息不丢失

比如如下的同步发送的代码

try {

RecordMetadata metadata = producer.send(record).get();

System.out.println(“消息发送成功。”);

} catch (Throwable e) {

System.out.println(“消息发送失败!”);

System.out.println(e);

}

异步发送的时候,需要在回调方法中检查,检查是否发送成功

producer.send(record, (metadata, exception) -> {

if (metadata != null) {

System.out.println(“消息发送成功。”);

} else {

System.out.println(“消息发送失败!”);

System.out.println(exception);

}

});

2.存储阶段,Borker自身保证消息是否丢失,如果Broker出现了故障

对于这一阶段,我们能做的暂时只有配置对应的Broker参数来避免宕机丢数据

比如单节点的Borker,我们可以设置先将消息存入磁盘在给Producer返回确认响应,这样即使宕机,也已经写入了磁盘,持久化了,比如RocketMQ中,大可以将刷盘方式 flushDiskType设置为SYNC_FLUSH刷盘

3.消费阶段,利用和生产阶段类似的方式保证可靠性,客户端从Broker中获取消息后,先执行用户的消费业务逻辑,成功后再返回消费确认响应,不要再收到消息后立刻发送确认

方便执行出错后,下次拉到的还是同一条消息

本章,我们说了消息队列如何保证可靠性,如何不丢失的

我们在确保编写的同时,还需要加上一些日志,方便定位问题,解决问题

对于Broker还是Cousumer来说,都是可能受到重复消息的,那么我们如何处理这种重复消息,不影响业务逻辑的正确性呢?

简单来说下实现,可以在生产者端来提供一个唯一标识,消费者端有识别并检查标识是否已经处理过的能力,比如在Reids或者本地维护一个集合来确保数据是否已经由处理,如果数据量过大,还可以利用布隆表达式来进行第一层的过滤

发表评论

邮箱地址不会被公开。 必填项已用*标注