定时消息指的是消息发送到Broker之后,并不立刻被消费者消费,而是等待一段时间之后才能进行消费,RMQ并不支持任意时间精度的消费,而如果需要支持的话,则需要进行消息的排序,然后进行分批的调度,这会带来大量的性能消耗,于是只支持特定级别的延迟消息,常见的有 1s 5s 10s 30s 1m 2m 3m 4m 5m 依次下去,而且在RMQ中,消息重试也是依赖于定时消息,即出现故障后,定时5m后消费

定时消息实现类为ScheduleMessageService,此类由DefaultMessageStore中创建,然后由DefaultMessaegService load后start起来

我们直接看其的load方法

public boolean load() {

boolean result = super.load();

//完成消息进度的加载和delayLevelTable数据的构造,延迟队列消息消费进度

//存储在${ROCKET_HOME}/store/config/delayOffset.json

result = result && this.parseDelayLevel();

return result;

}

在parseDelayLevel()

public boolean parseDelayLevel() {

HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();

timeUnitTable.put(“s”, 1000L);

timeUnitTable.put(“m”, 1000L * 60);

timeUnitTable.put(“h”, 1000L * 60 * 60);

timeUnitTable.put(“d”, 1000L * 60 * 60 * 24);

//构造DelayLevel

String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();

try {

String[] levelArray = levelString.split(” “);

for (int i = 0; i < levelArray.length; i++) {

String value = levelArray[i];

String ch = value.substring(value.length() – 1);

Long tu = timeUnitTable.get(ch);

int level = i + 1;

if (level > this.maxDelayLevel) {

this.maxDelayLevel = level;

}

long num = Long.parseLong(value.substring(0, value.length() – 1));

long delayTimeMillis = tu * num;

this.delayLevelTable.put(level, delayTimeMillis);

}

} catch (Exception e) {

log.error(“parseDelayLevel exception”, e);

log.info(“levelString String = {}”, levelString);

return false;

}

return true;

}

将MessageDelayLevel定义的延迟级别转换为Map,获取到对应的延迟时间

然后走进Start方法

this.timer = new Timer(“ScheduleMessageTimerThread”, true);

//获取不同的延迟级别

for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {

Integer level = entry.getKey();

//具体的秒数

Long timeDelay = entry.getValue();

Long offset = this.offsetTable.get(level);

if (null == offset) {

offset = 0L;

}

//如果存在,那么就启动一个定时任务,第一默认1s后执行

if (timeDelay != null) {

this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);

}

}

原因为,定时消息单独一个主题,SCHEDULE_TOPIC_XXXX 主题下的队列数量为MessagStoreConfig#messageDelayLevel配置的延迟级别数量

对应关系为queueId 等于 延迟级别 减一

//每隔10s进行一次持久化的操作

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override

public void run() {

try {

if (started.get()) ScheduleMessageService.this.persist();

} catch (Throwable e) {

log.error(“scheduleAtFixedRate flush exception”, e);

}

}

}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

并且开启一个每隔10s

并且可以看出,上述的代码中,我们循环的,为每一个延迟级别都创建了一个调度任务,调度任务的实现类就是DeliverDelayedMessageTimerTask,我们就看一下其内部实现

其内部首先在run中调用了executeOnTimeup()

executeOnTimeup()内部实现如下

1.获取消费队列

//首先根据队列ID和延迟主题,获取到对应的消息消费队列,没有查找到,说明没有这个延迟级别的消息

ConsumeQueue cq =

ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,

delayLevel2QueueId(delayLevel));

long failScheduleOffset = offset;

2.如果不为空,则说明找到了,读取遍历ConsumeQueue

long nextOffset = offset;

int i = 0;

//遍历ConsumeQueue

ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

//每个ConsumeQueue中条目标准为20个字节

for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {

//解析ConsumeQueue

long offsetPy = bufferCQ.getByteBuffer().getLong();

int sizePy = bufferCQ.getByteBuffer().getInt();

long tagsCode = bufferCQ.getByteBuffer().getLong();

//进行关于tags的校验

if (cq.isExtAddr(tagsCode)) {

if (cq.getExt(tagsCode, cqExtUnit)) {

tagsCode = cqExtUnit.getTagsCode();

} else {

//can’t find ext content.So re compute tags code.

log.error(“[BUG] can’t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}”,

tagsCode, offsetPy, sizePy);

long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);

tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);

}

}

上面同时还会校验tags

4.校验完成tags,从commitLog文件中,读取出实际的Msg

//从CommitLog读取对应的Message

MessageExt msgExt =

ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(

offsetPy, sizePy);

5.如果找到了,构建一个新的Msg,并放入commitlog中,方便重新消费

/必须确保找到了

try {

//重新构建一个msg

MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {

log.error(“[BUG] the real topic of schedule msg is {}, discard the msg. msg={}”,

msgInner.getTopic(), msgInner);

continue;

}

//再次存入commitLog,转发到主题对应的消息队列,供消费者消费

PutMessageResult putMessageResult =

ScheduleMessageService.this.writeMessageStore

.putMessage(msgInner);

if (putMessageResult != null

&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {

continue;

6.在全程没有出现问题的情况下,更新延迟队列拉取进度

//进行更新消费进度

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(

this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);

ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

return;

那么整体总结一下

1.消息消费者发送消息,如果消息的delayLevel大于0,则改变topic为SCHEDULE_TOPIC_XXX,放入消息队列中,延迟的消息队列为delayLevel-1

2.消息经由commitlog转发到消息消费队列SCHEDULE_TOPIC_XXX消息消费队列

3.定时任务Time每隔1s,根据上次拉取的偏移量从消息队列中取出所有的消息

4.根据物理偏移量和消息大小从CommitLog中拉取消息

5.根据消息属性重新创建消息,存入commitLog中

6.转发到原topic队列中,以便消费

发表评论

邮箱地址不会被公开。 必填项已用*标注