之前已经说了,PullMessageService负责对消息队列进行消息的拉取,然后将消息存入ProcessQueue消息处理队列中,之后调用ConsumeMessageService的submitConsumeRequest进行消费,使用线程池进行消费消息,确保了消息队列和消息消费的解耦,RMQ使用ConsumeMessageService实现消息消费的处理逻辑,RMQ支持顺序消费和并发消费,这次我们先说并发的消费
ConsumeMessageService是一个接口,内部的接口列表有
其实现类有
我们就从ConsumeMessageConcurrentlyService下手了
从服务器拉取完成消息之后进行Callback的操作,将消息放入了ProcessQueue中,然后将消息放到消费线程中执行
其中就是调用了submitConsumeRquest函数
在其中分为了两种,分别是分页和不分页的区别
首先函数整体如下
public void submitConsumeRequest(
final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } } |
上面首先是不分页的
//获取到一次消费的数量,一般是1
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); //如果小于等于了 if (msgs.size() <= consumeBatchSize) { //创建一个ConsumeRequest ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { //进行提交 this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } |
其次是分页的
else {
//进行分页 for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); //每次从msgs中取出规定数量的msg for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { //添加进去 msgThis.add(msgs.get(total)); } else { break; } } //封装为Request ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { //提交 this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } |
然后我们进入到ConsumeRequest的run函数中
if (this.processQueue.isDropped()) {
log.info(“the message queue not be able to consume, because it’s dropped. group={} {}”, ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } |
首先看一下处理Queue是否已经被丢弃了,如果已经被丢弃了
则直接返回
然后尝试恢复主题名
//接下来,尝试恢复主题名
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); |
内部实现为:
//用于恢复主题名
public void resetRetryAndNamespace(final List<MessageExt> msgs, /*group名称*/String consumerGroup) { final String groupTopic = MixAll.getRetryTopic(consumerGroup); //获取到所有的msg,一般只有一个 for (MessageExt msg : msgs) { //如果在延迟topic这个属性中拿到了 String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (retryTopic != null && groupTopic.equals(msg.getTopic())) { //且校验成功 msg.setTopic(retryTopic); } if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } } } |
然后走到消息消费的核心逻辑
//在上面执行过前置的钩子函数之后,执行实际的消费逻辑
long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { //对每一个msg设置时间戳 MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } //利用监听者进行消费获得状态 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(“consumeMessage exception: {} Group: {} Msgs: {} MQ: {}”, RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } |
返回的结果无非就是 CONSUME_SUCCESS 和 RECONSUME_LATER
接下来会执行后置的钩子函数
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn(“processQueue is dropped without process consume result. messageQueue={}, msgs={}”, messageQueue, msgs); } |
钩子执行完成,会再次查看ProcessQueue是不是被抛弃了
但是对于已经被drop的,不做任何的处理,那么就会出现重复消费的情况
那么在其中的processConsumeResult函数,就是针对status的不同情况,进行消费的
switch (status) {
case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() – 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() – ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } |
根据status,计算ackIndex
如果是Success,那么就将ackIndex设置为msg.size() -1
如果有LATER,则直接将设置为-1
一般来说一个msgs中只有一条,那么就会返回ACK,然后根据result进行是否重新消费
case CLUSTERING:
//如果是集群模式 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); //尝试遍历失败的消息,如果是success,那么就不会有消息出现 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); //然后提交ACK boolean result = this.sendMessageBack(msg, context); if (!result) { //不成功就只好考虑重新消费了 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); //延迟一段时间重新消费 this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; |
最后更新偏移量
//获取到偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { //更新偏移量 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } |
这里进行更新偏移量
如果有消费失败且ack失败的,则不增加消费的offset
这就说明,ack成功了,那么在Broker端也会重新生成一条新的消息,然后方便重新消费
拥有一个全新的队列偏移量
那么接下来,我们就会说明一下ACK的整体流程