PullMessageService在启动的时候会从pullRequestQueue中拿去PullRequest
如果没有PullRequest,那么就会阻塞
那么会带来两个问题
1.PullRequest是什么时候放进去PullRequestQueue中以便唤醒PullMessageService的呢?
2.集群中多个消费者如何负载主题下的多个消费队列的,如果有新的消费者加入,如何重新分布的呢?
RMQ中消息队列的重新分布是由RebalacneService线程实现的,一个MQClientInstance上持有一个RebalanceService的实现,并且在run中启动
我们首先看一下RebalanceService的run函数
@Override
public void run() { log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + ” service end”); } |
上面的函数一目了然,就是等待一个时间,然后进行重新的负载均衡
进入doRebalance函数中
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error(“doRebalance exception”, e); } } } } |
从实现中可以看出,是遍历所有的消费者,并进行doRebalance方法
然后进入Consumer的内部
@Override
public void doRebalance() { if (this.rebalanceImpl != null) { this.rebalanceImpl.doRebalance(false); } } |
每个Consumer都是持有了一个rebalanceImpl
在rebalanceImpl内部的doRebalance是遍历订阅信息对每个主题的队列进行重新负载
public void doRebalance(final boolean isOrder) {
//首先是获取到订阅信息 Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { //以topic为粒度,进行重新负载 final String topic = entry.getKey(); try { //这里的遍历,也会操作上面的sub信息 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn(“rebalanceByTopic Exception”, e); } } } } this.truncateMessageQueueNotMyTopic(); } |
然后进入Rebalance中
//根据是广播还是集群模式,进行分类
switch (messageModel) {
这里我们以集群为例进行讲解
//获取到所有的队列信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //然后获取这个消费组所有的客户端ID //这里需要想到,此client无论请求哪个Broker都可以,因为一个MQclient会向所有Broker发送细腻套 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn(“doRebalance, {}, but the topic[{}] not exist.”, consumerGroup, topic); } } if (null == cidAll) { log.warn(“doRebalance, {} {}, get consumer id list failed”, consumerGroup, topic); } //上述任意一个为空,则pass消费 if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); //进行排序,确定每一个消费组看到的视图一致 Collections.sort(mqAll); Collections.sort(cidAll); //获取到分配算法 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; |
这里的分配算法主要有:
1.平均分配,就是先获取消费队列数量,然后平均分给每个消费者数量的
2.平均轮训分配,每个消费队列,轮询者分给每一个每个消费者
3.一致性Hash
4.根据配置,分配固定的队列
5.根据Broker部署机房名
整体思路就是,分配现在的队列集合,然后按照队列如果已经不在集合中,则停止消费,如果新添加的,则创建PullRequest,这就是第一个问题的解答
我们继续看代码
try {
//获取到新的队列 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error(“AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}”, strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } //进行更新队列信息 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( “rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}”, strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } |
在其中,对队列信息进行更新的是updateProcessQueueTableInRebalance函数
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
//获取到MessageQueue和ProcessQueue while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { //如果新获取的集合不包含,丢弃 if (!mqSet.contains(mq)) { pq.setDropped(true); //是否能够将MessageQueue和ProcessQueue从缓存表中移除 if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info(“doRebalance, {}, remove unnecessary mq, {}”, consumerGroup, mq); } |
然后遍历这次分配的集合,我们要新增
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) { //如果不包含这一个,则是这一次新分配的消息队列 if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn(“doRebalance, {}, add a new mq failed, {}, because lock failed”, consumerGroup, mq); continue; } // this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); //获取消费进度 long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info(“doRebalance, {}, mq already exists, {}”, consumerGroup, mq); } else { log.info(“doRebalance, {}, add a new mq, {}”, consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn(“doRebalance, {}, add new mq failed, {}”, consumerGroup, mq); } } } //分发PullRequest this.dispatchPullRequest(pullRequestList); return changed; |
其中主要的是 computePullFromWhere 将nextOffset之前进行计算
这一步主要存在
RebalancePushImpl#computePullFromWhere 中
其中的Offset获取逻辑有
case CONSUME_FROM_LAST_OFFSET: {
//从最新的偏移量开始消费 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); //正常情况 if (lastOffset >= 0) { result = lastOffset; } //第一次拉取文件 else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { //返回-1 表示存储了错误的数值 result = -1; } break; } case CONSUME_FROM_FIRST_OFFSET: { //从头开始消费 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { //正常 result = lastOffset; } else if (-1 == lastOffset) { //第一次 result = 0L; } else { result = -1; } break; } case CONSUME_FROM_TIMESTAMP: { //获取到最后一次消费进度 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { //正常 result = lastOffset; } else if (-1 == lastOffset) { //第一次 //如果支持重试 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { //获取消费者气动阀时间 long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); //搜索offset result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } |
接着往下说,
就是组成完成PullRequest之后,我们会将其放入PullRequestList
即函数
//分发PullRequest
this.dispatchPullRequest(pullRequestList);
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info(“doRebalance, {}, add a new pull request {}”, consumerGroup, pullRequest); } } |
将PullRequest放入到PullMessageService中,然后唤醒PullMessageService线程
PullRequest对象在什么时候创建并加入到PullRequestQueue
答案是RebalanceService每隔20S对消费者订阅的主题进行一次队列的重新分配,每一次分配都会获取主题的所有队列,然后如果有新加入的队列,则创建一个新的PullRequest对象
对于第二个问题
集群中多个消费者如何负载主题下的多个消费队列的,如果有新的消费者加入,如何重新分配
首先从Broker中查询出所以的消费者,并且对消费队列,消费者列表进行排序,新加入的消费者就会在队列重新分配的时候进行分配消费了