Redis的Streams到消息队列

如果需要Redis作为消息队列,我们可以考虑什么数据类型呢?

因为消息队列的基本要求,消息要具有顺序,且能处理重复数据,和消息可靠性

在Redis中,可以作为实现需求的数据结构,包含List和Streams两个数据类型

如果基于List实现消息队列解决方案

生产者可以使用LPUSH命令将要发送的消息依次写入List,消费者则可以使用RPOP命令,从另一端读取消息进行处理

比如下面的操作,先用LPUSH写入了两条消息,分别是5和3,然后消费者利用RPOP将两个消息依次读出,进行相对应的处理

图片

不过需要注意,RPOP命令是不阻塞的,可能需要程序不停的进行调用RPOP命令获取结果

这会导致不必要的CPU消耗

对此Redis提供了BRPOP命令,也就是阻塞式读取,客户端在没有读到队列数据的时候,会自动的阻塞,直到有消息入队,节省CPU开销

其次是对于重复消息的问题,可以考虑给每一个消息提供一个全局唯一的ID号,当收到一个消息的时候,消费者程序就可以自行判断是否是重复消息,如果已经处理过了,消费者程序就不会进行处理了

对于List,则需要生产者自行生成一条消息放入List的Value中

最后就是消息的可靠性,就是当消费者在读取消息后没有处理成功并且宕机,那么这条消息就丢失了

对此List提供了BROPOPLPUSH命令,就是在读取List中的消息时候,会将这个消息插入到另一个List中,方便重新消费

基本的操作如下

图片

那么基于List,也可以实现一个基本的消息队列

然后Redis除此外,还提供了更加适合作为消息队列的数据结构

Streams就是这个数据类型,且提供了更加丰富的消息队列命令

XADD: 插入消息,保证有序,可以自动生成全局唯一ID

XREAD:用于读取消息,按照ID读取数据

XREADGROUP:按照消费组读取消息

XPENDING 和 XACK, XPENDING命令查询读取但未确认的消息,XACK用于向消息队列确认消息处理完成

首先是XADD,往消息队列中插入,插入的消息,会自动生成一个全局唯一的ID

比如下面命令,

XADD mqstream * repo 5

“1599203861727-0”

全局的唯一ID由两部分组成,第一部分1599203861727由时间戳 -0表示这个毫秒内的第一条消息

XREAD在读取消息的时候,可以指定一个消息ID,并且从这个消息的下一个消息开始读取

可执行如下的命令,获取指定消息后的消息

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0

其中的BLOCK配置项,就是实现了类似BRPOP的阻塞读取操作,可以配置阻塞的时长

然后可以不指定开始的id,直接使用$符号读取最新的消息

XREAD block 10000 streams mqstream $

Streams本身支持XGROUP创建消费组,创建完成,可以直接使用XREADGROUP来让消费组内的消费者读取消息

XGROUP create mqstream group1 0

然后,我们执行一个命令,让group1消费组的消费者consumer1从mqstream中读取所有消息,其中,命令最后的参数 “>”,从第一条尚未消费的消费开始读取

XREADGROUP group group1 consumer1 streams mqstream >

1) 1) “mqstream”

2) 1) 1) “1599203861727-0”

2) 1) “repo”

2) “5”

2) 1) “1599274912765-0”

2) 1) “repo”

2) “3”

3) 1) “1599274925823-0”

2) 1) “repo”

2) “2”

4) 1) “1599274927910-0”

2) 1) “repo”

2) “1”

还可以加入count 命令,控制每次消费的次数

而且为了保证消费者发生故障或者宕机重启之后,仍然可以读取未消费的消息,Streams自动使用内部队列,留存每个消费者读取的消息,直到获取到XACK

在其中,XPENDING命令可以获取到消费者已经读取,还没有确认的消息个数

XPENDING mqstream group2

1) (integer) 3

2) “1599203861727-0”

3) “1599274925823-0”

第二第三行,我们可以看到读取到的最大及最小的消息ID

如果需要查看某一个消息消费者具体读取了那些数据,可以执行如下的命令

XPENDING mqstream group2 – + 10 consumer2

可以查看读取的消息ID

如果一旦此消息已经被处理了,consumer2就可以使用XACK命令通知Streams

此消息会被删除

XACK mqstream group2 1599274912765-0

(integer) 1

XPENDING mqstream group2 – + 10 consumer2

(empty list or set)

这就是使用Streams实现消息队列的方法

那么我们总结和对比一下使用List和Streams实现消息队列

图片

但对于我个人来说,我认为Redis并不适合做消息队列,专业的事情要交给专业的人来做

最后一个小问题,如果一个消息需要被多个消费者消费,那么使用什么样的数据类型来解决呢?

我想如果有使用过MQ的话,这个问题应该遇到过,那就是分消费者组,然后就可以进行重复消费了

发表评论

邮箱地址不会被公开。 必填项已用*标注