Redis的Streams到消息队列
如果需要Redis作为消息队列,我们可以考虑什么数据类型呢?
因为消息队列的基本要求,消息要具有顺序,且能处理重复数据,和消息可靠性
在Redis中,可以作为实现需求的数据结构,包含List和Streams两个数据类型
如果基于List实现消息队列解决方案
生产者可以使用LPUSH命令将要发送的消息依次写入List,消费者则可以使用RPOP命令,从另一端读取消息进行处理
比如下面的操作,先用LPUSH写入了两条消息,分别是5和3,然后消费者利用RPOP将两个消息依次读出,进行相对应的处理
不过需要注意,RPOP命令是不阻塞的,可能需要程序不停的进行调用RPOP命令获取结果
这会导致不必要的CPU消耗
对此Redis提供了BRPOP命令,也就是阻塞式读取,客户端在没有读到队列数据的时候,会自动的阻塞,直到有消息入队,节省CPU开销
其次是对于重复消息的问题,可以考虑给每一个消息提供一个全局唯一的ID号,当收到一个消息的时候,消费者程序就可以自行判断是否是重复消息,如果已经处理过了,消费者程序就不会进行处理了
对于List,则需要生产者自行生成一条消息放入List的Value中
最后就是消息的可靠性,就是当消费者在读取消息后没有处理成功并且宕机,那么这条消息就丢失了
对此List提供了BROPOPLPUSH命令,就是在读取List中的消息时候,会将这个消息插入到另一个List中,方便重新消费
基本的操作如下
那么基于List,也可以实现一个基本的消息队列
然后Redis除此外,还提供了更加适合作为消息队列的数据结构
Streams就是这个数据类型,且提供了更加丰富的消息队列命令
XADD: 插入消息,保证有序,可以自动生成全局唯一ID
XREAD:用于读取消息,按照ID读取数据
XREADGROUP:按照消费组读取消息
XPENDING 和 XACK, XPENDING命令查询读取但未确认的消息,XACK用于向消息队列确认消息处理完成
首先是XADD,往消息队列中插入,插入的消息,会自动生成一个全局唯一的ID
比如下面命令,
XADD mqstream * repo 5
“1599203861727-0”
全局的唯一ID由两部分组成,第一部分1599203861727由时间戳 -0表示这个毫秒内的第一条消息
XREAD在读取消息的时候,可以指定一个消息ID,并且从这个消息的下一个消息开始读取
可执行如下的命令,获取指定消息后的消息
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
其中的BLOCK配置项,就是实现了类似BRPOP的阻塞读取操作,可以配置阻塞的时长
然后可以不指定开始的id,直接使用$符号读取最新的消息
XREAD block 10000 streams mqstream $
Streams本身支持XGROUP创建消费组,创建完成,可以直接使用XREADGROUP来让消费组内的消费者读取消息
XGROUP create mqstream group1 0
然后,我们执行一个命令,让group1消费组的消费者consumer1从mqstream中读取所有消息,其中,命令最后的参数 “>”,从第一条尚未消费的消费开始读取
XREADGROUP group group1 consumer1 streams mqstream >
1) 1) “mqstream” 2) 1) 1) “1599203861727-0” 2) 1) “repo” 2) “5” 2) 1) “1599274912765-0” 2) 1) “repo” 2) “3” 3) 1) “1599274925823-0” 2) 1) “repo” 2) “2” 4) 1) “1599274927910-0” 2) 1) “repo” 2) “1” |
还可以加入count 命令,控制每次消费的次数
而且为了保证消费者发生故障或者宕机重启之后,仍然可以读取未消费的消息,Streams自动使用内部队列,留存每个消费者读取的消息,直到获取到XACK
在其中,XPENDING命令可以获取到消费者已经读取,还没有确认的消息个数
XPENDING mqstream group2
1) (integer) 3 2) “1599203861727-0” 3) “1599274925823-0” |
第二第三行,我们可以看到读取到的最大及最小的消息ID
如果需要查看某一个消息消费者具体读取了那些数据,可以执行如下的命令
XPENDING mqstream group2 – + 10 consumer2
可以查看读取的消息ID
如果一旦此消息已经被处理了,consumer2就可以使用XACK命令通知Streams
此消息会被删除
XACK mqstream group2 1599274912765-0
(integer) 1 XPENDING mqstream group2 – + 10 consumer2 (empty list or set) |
这就是使用Streams实现消息队列的方法
那么我们总结和对比一下使用List和Streams实现消息队列
但对于我个人来说,我认为Redis并不适合做消息队列,专业的事情要交给专业的人来做
最后一个小问题,如果一个消息需要被多个消费者消费,那么使用什么样的数据类型来解决呢?
我想如果有使用过MQ的话,这个问题应该遇到过,那就是分消费者组,然后就可以进行重复消费了