RMQ只支持局部消息顺序消费,如果需要全局顺序消费则需要将主题配置为一个队列,而局部的消息消费模型,则可以参考并发消息 消息队列负载 消息拉取 消息消费 消息消费进度存储
1.消息队列负载
RMQ首先通过RebalanceService进行消息队列的负载,集群模式下同一个消费组的消费者共同承担订阅主题下消息队列的消费,同一个消息队列在同一时刻只会被消费组内的一个消费者消费
一个消费者可以分配多个消息队列
那么对于消费队列的分配中,涉及到了顺序消息消费
if (isOrder && !this.lock(mq)) {
log.warn(“doRebalance, {}, add a new mq failed, {}, because lock failed”, consumerGroup, mq); continue; } |
在消息队列重新分配之后,分配到了新的消息队列,首先看是否是顺序消息,然后尝试加上锁
加锁失败直接跳过,不创建PullRequest
2.消息拉取
RMQ的消息拉取交由PullMessageService线程负责,进行循环拉取
//这里是顺序消费的方式
if (processQueue.isLocked()) { //判断是不是已经锁定了 //如果是第一次锁定 if (!pullRequest.isLockedFirst()) { //计算offset final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info(“the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}”, pullRequest, offset, brokerBusy); if (brokerBusy) { log.info(“[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}”, pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { //没锁就延迟三秒 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info(“pull message later because not locked in broker, {}”, pullRequest); return; } |
3.消息消费
ConsumeMessageConcurrentlyService |
我们直接从这个Service的start方法开始看起
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { //轮询的方式,20s为间隔,锁定一次分配给自己的消息消费队列,没加锁的队列消费会失败 @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } |
整体加锁流程如下
我们就进行lockMQPeriodically函数的追踪.最终走到了RebalanceImpl#lockAll
第一步,
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName(); |
在buildProcessQueueTableByBrokerName函数中,组成不同的映射
private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>(); for (MessageQueue mq : this.processQueueTable.keySet()) { //遍历获得Set的MessageQueue Set<MessageQueue> mqs = result.get(mq.getBrokerName()); if (null == mqs) { mqs = new HashSet<MessageQueue>(); result.put(mq.getBrokerName(), mqs); } mqs.add(mq); } //组成 brokerName – set<MessageQueue>的映射 return result; } |
然后继续往下走
//获取到Broker的Master节点
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); if (findBrokerResult != null) { //创建锁定请求 LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.setMqSet(mqs); |
首先根据返回的Map中,遍历获取到Broker的Master节点
并且向着Broker的Master节点发起锁定请求,获取锁定的消息消费
try {
//尝试锁定 Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); |
然后将遍历之后的结果中包含的MessageQueue设置locked状态
//遍历返回的结果
for (MessageQueue mq : lockOKMQSet) { //返回的消息队列 ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { log.info(“the message queue locked OK, Group: {} {}”, this.consumerGroup, mq); } //设置为锁定标志 processQueue.setLocked(true); //设置最后锁定时间 processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } |
最后将不符合的MQ设置为Locked false
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) { //将不包含在内的,设置为false ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { processQueue.setLocked(false); log.warn(“the message queue locked Failed, Group: {} {}”, this.consumerGroup, mq); } } } |
然后是OrderService提供的任务提交接口
@Override
public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } } |
简单的创建完成之后直接提交
主要的是,在其中PullRequest的run方法
首先判断是否被丢弃了
if (this.processQueue.isDropped()) {
log.warn(“run, the message queue not be able to consume, because it’s dropped. {}”, this.messageQueue); return; }/ |
接下来尝试获取到这个消息队列的锁
//尝试获取到对应的Lock,一个消费队列同一时间只会被一个线程消费
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); //如果锁住了 synchronized (objLock) { |
//广播模式确定了,那就直接处理,因为消费队列并不独占,不然你就需要确认队列是不是被锁住了
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { if (this.processQueue.isDropped()) { log.warn(“the message queue not be able to consume, because it’s dropped. {}”, this.messageQueue); break; } //还是校验,因为是循环消费 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn(“the message queue not locked, so consume later, {}”, this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn(“the message queue lock expired, so consume later, {}”, this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } |
对于顺序消费的结束,是根据时间进行限制的
//这里的限制是利用的时间,如果大于了MAX_TIME_CONSUME_CONTINUOUSLY,则消费结束
long interval = System.currentTimeMillis() – beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } |
获取消息并消费
//获取到BatchSize消息,如果没有取到消息,就结束消费
final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { |
真正的消费流程
//真正的顺序消费流程
long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { //获取到消息消费锁 this.processQueue.getLockConsume().lock(); //如果被丢弃,跳出此消息队列的消费 if (this.processQueue.isDropped()) { log.warn(“consumeMessage, the message queue not be able to consume, because it’s dropped. {}”, this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(“consumeMessage exception: {} Group: {} Msgs: {} MQ: {}”, RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } |
之后
//重新设置返回状态
long consumeRT = System.currentTimeMillis() – beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } |
最后,根据status设置Result
//根据status设置Result
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); |
在processConsumeResult中根据状态判断是否需要提交
case COMMIT:
case ROLLBACK: log.warn(“the message queue consume result is illegal, we think you want to ack these message {}”, consumeRequest.getMessageQueue()); case SUCCESS: commitOffset = consumeRequest.getProcessQueue().commit(); this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; |
提交流程下
public long commit() {
try { //尝试获取锁 this.lockTreeMap.writeLock().lockInterruptibly(); try { //最新的偏移量 Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); //减去待消费的数量 msgCount.addAndGet(0 – this.consumingMsgOrderlyTreeMap.size()); for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { //减去每一个消息的长度 msgSize.addAndGet(0 – msg.getBody().length); } this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { return offset + 1; } } finally { //解锁 this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error(“commit exception”, e); } return -1; } |
总结起来就是将这批消息从ProcessQueue中移除,然后维护msgCount和offset,最后清空消息
这是消费正常的问题
但是如果消费失败,状态标记为了等待一段时间再次消费,那么就需要设置消息重试
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { //如果可以重试,在里面还涉及的到DLQ死信队列一些事情,因为会尝试提交给Broker端 consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs); //上面进行消息重试次数增加 this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); //提交延迟消费 continueConsume = false; } else { commitOffset = consumeRequest.getProcessQueue().commit(); } break; |
上面就是检查了重试次数,然后尝试提交给Broker,然后不行就进行延迟提交消费
与之相对应的还有消息重试之前的前置操作
public void makeMessageToConsumeAgain(List<MessageExt> msgs) {
try { //加锁 this.lockTreeMap.writeLock().lockInterruptibly(); try { for (MessageExt msg : msgs) { //从消费中去除 this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset()); //加入到等待消费 this.msgTreeMap.put(msg.getQueueOffset(), msg); } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error(“makeMessageToCosumeAgain exception”, e); } } |
之后等待1s进行重新重试操作
最后根据commitOffset进行持久化消息进度
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); } |
最后,简要说一下消息队列锁的实现
顺序消息的消费的各个环节都是围绕着消息消费队列和消息处理队列展开的,消息消费进度拉取的时候需要判断lock是不是已经被锁定了,这个上锁的实际则是消费者发送加锁的请求给Broker并获取到加锁成功的响应.
服务器端关于加锁的请求的处理类,在于RebalanceLockManager之中
其中的重要属性是
REBALANCE_LOCK_MAX_LIVE_TIME
涉及到锁的最大存活时间
ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable
在其中,以消息消费组分组,每个消息队列对应一个锁对象,表示当前该消息队列被消费组中哪个消费者持有
内部具有的函数为
tryLock
isLocked
tryLockBatch
unlockBatch
重要的为 tryLockBatch和unlockBatch
但其内部都是对于mqLockTable数据结构的维护,分析的需求并不大