定时消息指的是消息发送到Broker之后,并不立刻被消费者消费,而是等待一段时间之后才能进行消费,RMQ并不支持任意时间精度的消费,而如果需要支持的话,则需要进行消息的排序,然后进行分批的调度,这会带来大量的性能消耗,于是只支持特定级别的延迟消息,常见的有 1s 5s 10s 30s 1m 2m 3m 4m 5m 依次下去,而且在RMQ中,消息重试也是依赖于定时消息,即出现故障后,定时5m后消费
定时消息实现类为ScheduleMessageService,此类由DefaultMessageStore中创建,然后由DefaultMessaegService load后start起来
我们直接看其的load方法
public boolean load() {
boolean result = super.load(); //完成消息进度的加载和delayLevelTable数据的构造,延迟队列消息消费进度 //存储在${ROCKET_HOME}/store/config/delayOffset.json result = result && this.parseDelayLevel(); return result; } |
在parseDelayLevel()
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); timeUnitTable.put(“s”, 1000L); timeUnitTable.put(“m”, 1000L * 60); timeUnitTable.put(“h”, 1000L * 60 * 60); timeUnitTable.put(“d”, 1000L * 60 * 60 * 24); //构造DelayLevel String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); try { String[] levelArray = levelString.split(” “); for (int i = 0; i < levelArray.length; i++) { String value = levelArray[i]; String ch = value.substring(value.length() – 1); Long tu = timeUnitTable.get(ch); int level = i + 1; if (level > this.maxDelayLevel) { this.maxDelayLevel = level; } long num = Long.parseLong(value.substring(0, value.length() – 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); } } catch (Exception e) { log.error(“parseDelayLevel exception”, e); log.info(“levelString String = {}”, levelString); return false; } return true; } |
将MessageDelayLevel定义的延迟级别转换为Map,获取到对应的延迟时间
然后走进Start方法
this.timer = new Timer(“ScheduleMessageTimerThread”, true);
//获取不同的延迟级别 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); //具体的秒数 Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } //如果存在,那么就启动一个定时任务,第一默认1s后执行 if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } |
原因为,定时消息单独一个主题,SCHEDULE_TOPIC_XXXX 主题下的队列数量为MessagStoreConfig#messageDelayLevel配置的延迟级别数量
对应关系为queueId 等于 延迟级别 减一
//每隔10s进行一次持久化的操作
this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error(“scheduleAtFixedRate flush exception”, e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); |
并且开启一个每隔10s
并且可以看出,上述的代码中,我们循环的,为每一个延迟级别都创建了一个调度任务,调度任务的实现类就是DeliverDelayedMessageTimerTask,我们就看一下其内部实现
其内部首先在run中调用了executeOnTimeup()
executeOnTimeup()内部实现如下
1.获取消费队列
//首先根据队列ID和延迟主题,获取到对应的消息消费队列,没有查找到,说明没有这个延迟级别的消息
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; |
2.如果不为空,则说明找到了,读取遍历ConsumeQueue
long nextOffset = offset;
int i = 0; //遍历ConsumeQueue ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); //每个ConsumeQueue中条目标准为20个字节 for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { //解析ConsumeQueue long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); //进行关于tags的校验 if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { //can’t find ext content.So re compute tags code. log.error(“[BUG] can’t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}”, tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } |
上面同时还会校验tags
4.校验完成tags,从commitLog文件中,读取出实际的Msg
//从CommitLog读取对应的Message
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); |
5.如果找到了,构建一个新的Msg,并放入commitlog中,方便重新消费
/必须确保找到了
try { //重新构建一个msg MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error(“[BUG] the real topic of schedule msg is {}, discard the msg. msg={}”, msgInner.getTopic(), msgInner); continue; } //再次存入commitLog,转发到主题对应的消息队列,供消费者消费 PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; |
6.在全程没有出现问题的情况下,更新延迟队列拉取进度
//进行更新消费进度
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; |
那么整体总结一下
1.消息消费者发送消息,如果消息的delayLevel大于0,则改变topic为SCHEDULE_TOPIC_XXX,放入消息队列中,延迟的消息队列为delayLevel-1
2.消息经由commitlog转发到消息消费队列SCHEDULE_TOPIC_XXX消息消费队列
3.定时任务Time每隔1s,根据上次拉取的偏移量从消息队列中取出所有的消息
4.根据物理偏移量和消息大小从CommitLog中拉取消息
5.根据消息属性重新创建消息,存入commitLog中
6.转发到原topic队列中,以便消费