之前已经说了,PullMessageService负责对消息队列进行消息的拉取,然后将消息存入ProcessQueue消息处理队列中,之后调用ConsumeMessageService的submitConsumeRequest进行消费,使用线程池进行消费消息,确保了消息队列和消息消费的解耦,RMQ使用ConsumeMessageService实现消息消费的处理逻辑,RMQ支持顺序消费和并发消费,这次我们先说并发的消费

ConsumeMessageService是一个接口,内部的接口列表有

图片

其实现类有

图片

我们就从ConsumeMessageConcurrentlyService下手了

从服务器拉取完成消息之后进行Callback的操作,将消息放入了ProcessQueue中,然后将消息放到消费线程中执行

其中就是调用了submitConsumeRquest函数

在其中分为了两种,分别是分页和不分页的区别

首先函数整体如下

public void submitConsumeRequest(

final List<MessageExt> msgs,

final ProcessQueue processQueue,

final MessageQueue messageQueue,

final boolean dispatchToConsume) {

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

if (msgs.size() <= consumeBatchSize) {

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

this.submitConsumeRequestLater(consumeRequest);

}

} else {

for (int total = 0; total < msgs.size(); ) {

List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);

for (int i = 0; i < consumeBatchSize; i++, total++) {

if (total < msgs.size()) {

msgThis.add(msgs.get(total));

} else {

break;

}

}

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

for (; total < msgs.size(); total++) {

msgThis.add(msgs.get(total));

}

this.submitConsumeRequestLater(consumeRequest);

}

}

}

}

上面首先是不分页的

//获取到一次消费的数量,一般是1

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

//如果小于等于了

if (msgs.size() <= consumeBatchSize) {

//创建一个ConsumeRequest

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);

try {

//进行提交

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

this.submitConsumeRequestLater(consumeRequest);

}

其次是分页的

else {

//进行分页

for (int total = 0; total < msgs.size(); ) {

List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);

//每次从msgs中取出规定数量的msg

for (int i = 0; i < consumeBatchSize; i++, total++) {

if (total < msgs.size()) {

//添加进去

msgThis.add(msgs.get(total));

} else {

break;

}

}

//封装为Request

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);

try {

//提交

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

for (; total < msgs.size(); total++) {

msgThis.add(msgs.get(total));

}

this.submitConsumeRequestLater(consumeRequest);

}

}

}

然后我们进入到ConsumeRequest的run函数中

if (this.processQueue.isDropped()) {

log.info(“the message queue not be able to consume, because it’s dropped. group={} {}”, ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);

return;

}

首先看一下处理Queue是否已经被丢弃了,如果已经被丢弃了

则直接返回

然后尝试恢复主题名

//接下来,尝试恢复主题名

defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

内部实现为:

//用于恢复主题名

public void resetRetryAndNamespace(final List<MessageExt> msgs, /*group名称*/String consumerGroup) {

final String groupTopic = MixAll.getRetryTopic(consumerGroup);

//获取到所有的msg,一般只有一个

for (MessageExt msg : msgs) {

//如果在延迟topic这个属性中拿到了

String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);

if (retryTopic != null && groupTopic.equals(msg.getTopic())) {

//且校验成功

msg.setTopic(retryTopic);

}

if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));

}

}

}

然后走到消息消费的核心逻辑

//在上面执行过前置的钩子函数之后,执行实际的消费逻辑

long beginTimestamp = System.currentTimeMillis();

boolean hasException = false;

ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

try {

if (msgs != null && !msgs.isEmpty()) {

for (MessageExt msg : msgs) {

//对每一个msg设置时间戳

MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));

}

}

//利用监听者进行消费获得状态

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

} catch (Throwable e) {

log.warn(“consumeMessage exception: {} Group: {} Msgs: {} MQ: {}”,

RemotingHelper.exceptionSimpleDesc(e),

ConsumeMessageConcurrentlyService.this.consumerGroup,

msgs,

messageQueue);

hasException = true;

}

返回的结果无非就是 CONSUME_SUCCESS 和 RECONSUME_LATER

接下来会执行后置的钩子函数

if (!processQueue.isDropped()) {

ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

} else {

log.warn(“processQueue is dropped without process consume result. messageQueue={}, msgs={}”, messageQueue, msgs);

}

钩子执行完成,会再次查看ProcessQueue是不是被抛弃了

但是对于已经被drop的,不做任何的处理,那么就会出现重复消费的情况

那么在其中的processConsumeResult函数,就是针对status的不同情况,进行消费的

switch (status) {

case CONSUME_SUCCESS:

if (ackIndex >= consumeRequest.getMsgs().size()) {

ackIndex = consumeRequest.getMsgs().size() – 1;

}

int ok = ackIndex + 1;

int failed = consumeRequest.getMsgs().size() – ok;

this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);

this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);

break;

case RECONSUME_LATER:

ackIndex = -1;

this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),

consumeRequest.getMsgs().size());

break;

default:

break;

}

根据status,计算ackIndex

如果是Success,那么就将ackIndex设置为msg.size() -1

如果有LATER,则直接将设置为-1

一般来说一个msgs中只有一条,那么就会返回ACK,然后根据result进行是否重新消费

case CLUSTERING:

//如果是集群模式

List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());

//尝试遍历失败的消息,如果是success,那么就不会有消息出现

for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {

MessageExt msg = consumeRequest.getMsgs().get(i);

//然后提交ACK

boolean result = this.sendMessageBack(msg, context);

if (!result) {

//不成功就只好考虑重新消费了

msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

msgBackFailed.add(msg);

}

}

if (!msgBackFailed.isEmpty()) {

consumeRequest.getMsgs().removeAll(msgBackFailed);

//延迟一段时间重新消费

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());

}

break;

最后更新偏移量

//获取到偏移量

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());

if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {

//更新偏移量

this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);

}

这里进行更新偏移量

如果有消费失败且ack失败的,则不增加消费的offset

这就说明,ack成功了,那么在Broker端也会重新生成一条新的消息,然后方便重新消费

拥有一个全新的队列偏移量

那么接下来,我们就会说明一下ACK的整体流程

发表评论

邮箱地址不会被公开。 必填项已用*标注