PullMessageService在启动的时候会从pullRequestQueue中拿去PullRequest

如果没有PullRequest,那么就会阻塞

那么会带来两个问题

1.PullRequest是什么时候放进去PullRequestQueue中以便唤醒PullMessageService的呢?

2.集群中多个消费者如何负载主题下的多个消费队列的,如果有新的消费者加入,如何重新分布的呢?

RMQ中消息队列的重新分布是由RebalacneService线程实现的,一个MQClientInstance上持有一个RebalanceService的实现,并且在run中启动

我们首先看一下RebalanceService的run函数

@Override

public void run() {

log.info(this.getServiceName() + ” service started”);

while (!this.isStopped()) {

this.waitForRunning(waitInterval);

this.mqClientFactory.doRebalance();

}

log.info(this.getServiceName() + ” service end”);

}

上面的函数一目了然,就是等待一个时间,然后进行重新的负载均衡

进入doRebalance函数中

public void doRebalance() {

for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {

MQConsumerInner impl = entry.getValue();

if (impl != null) {

try {

impl.doRebalance();

} catch (Throwable e) {

log.error(“doRebalance exception”, e);

}

}

}

}

从实现中可以看出,是遍历所有的消费者,并进行doRebalance方法

然后进入Consumer的内部

@Override

public void doRebalance() {

if (this.rebalanceImpl != null) {

this.rebalanceImpl.doRebalance(false);

}

}

每个Consumer都是持有了一个rebalanceImpl

在rebalanceImpl内部的doRebalance是遍历订阅信息对每个主题的队列进行重新负载

public void doRebalance(final boolean isOrder) {

//首先是获取到订阅信息

Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

if (subTable != null) {

for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {

//以topic为粒度,进行重新负载

final String topic = entry.getKey();

try {

//这里的遍历,也会操作上面的sub信息

this.rebalanceByTopic(topic, isOrder);

} catch (Throwable e) {

if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

log.warn(“rebalanceByTopic Exception”, e);

}

}

}

}

this.truncateMessageQueueNotMyTopic();

}

然后进入Rebalance中

//根据是广播还是集群模式,进行分类

switch (messageModel) {

这里我们以集群为例进行讲解

//获取到所有的队列信息

Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

//然后获取这个消费组所有的客户端ID

//这里需要想到,此client无论请求哪个Broker都可以,因为一个MQclient会向所有Broker发送细腻套

List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

if (null == mqSet) {

if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

log.warn(“doRebalance, {}, but the topic[{}] not exist.”, consumerGroup, topic);

}

}

if (null == cidAll) {

log.warn(“doRebalance, {} {}, get consumer id list failed”, consumerGroup, topic);

}

//上述任意一个为空,则pass消费

if (mqSet != null && cidAll != null) {

List<MessageQueue> mqAll = new ArrayList<MessageQueue>();

mqAll.addAll(mqSet);

//进行排序,确定每一个消费组看到的视图一致

Collections.sort(mqAll);

Collections.sort(cidAll);

//获取到分配算法

AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

这里的分配算法主要有:

1.平均分配,就是先获取消费队列数量,然后平均分给每个消费者数量的

2.平均轮训分配,每个消费队列,轮询者分给每一个每个消费者

3.一致性Hash

4.根据配置,分配固定的队列

5.根据Broker部署机房名

整体思路就是,分配现在的队列集合,然后按照队列如果已经不在集合中,则停止消费,如果新添加的,则创建PullRequest,这就是第一个问题的解答

我们继续看代码

try {

//获取到新的队列

allocateResult = strategy.allocate(

this.consumerGroup,

this.mQClientFactory.getClientId(),

mqAll,

cidAll);

} catch (Throwable e) {

log.error(“AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}”, strategy.getName(),

e);

return;

}

Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();

if (allocateResult != null) {

allocateResultSet.addAll(allocateResult);

}

//进行更新队列信息

boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

if (changed) {

log.info(

“rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}”,

strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),

allocateResultSet.size(), allocateResultSet);

this.messageQueueChanged(topic, mqSet, allocateResultSet);

}

}

在其中,对队列信息进行更新的是updateProcessQueueTableInRebalance函数

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();

//获取到MessageQueue和ProcessQueue

while (it.hasNext()) {

Entry<MessageQueue, ProcessQueue> next = it.next();

MessageQueue mq = next.getKey();

ProcessQueue pq = next.getValue();

if (mq.getTopic().equals(topic)) {

//如果新获取的集合不包含,丢弃

if (!mqSet.contains(mq)) {

pq.setDropped(true);

//是否能够将MessageQueue和ProcessQueue从缓存表中移除

if (this.removeUnnecessaryMessageQueue(mq, pq)) {

it.remove();

changed = true;

log.info(“doRebalance, {}, remove unnecessary mq, {}”, consumerGroup, mq);

}

然后遍历这次分配的集合,我们要新增

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();

for (MessageQueue mq : mqSet) {

//如果不包含这一个,则是这一次新分配的消息队列

if (!this.processQueueTable.containsKey(mq)) {

if (isOrder && !this.lock(mq)) {

log.warn(“doRebalance, {}, add a new mq failed, {}, because lock failed”, consumerGroup, mq);

continue;

}

//

this.removeDirtyOffset(mq);

ProcessQueue pq = new ProcessQueue();

//获取消费进度

long nextOffset = this.computePullFromWhere(mq);

if (nextOffset >= 0) {

ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);

if (pre != null) {

log.info(“doRebalance, {}, mq already exists, {}”, consumerGroup, mq);

} else {

log.info(“doRebalance, {}, add a new mq, {}”, consumerGroup, mq);

PullRequest pullRequest = new PullRequest();

pullRequest.setConsumerGroup(consumerGroup);

pullRequest.setNextOffset(nextOffset);

pullRequest.setMessageQueue(mq);

pullRequest.setProcessQueue(pq);

pullRequestList.add(pullRequest);

changed = true;

}

} else {

log.warn(“doRebalance, {}, add new mq failed, {}”, consumerGroup, mq);

}

}

}

//分发PullRequest

this.dispatchPullRequest(pullRequestList);

return changed;

其中主要的是 computePullFromWhere 将nextOffset之前进行计算

这一步主要存在

RebalancePushImpl#computePullFromWhere 中

其中的Offset获取逻辑有

case CONSUME_FROM_LAST_OFFSET: {

//从最新的偏移量开始消费

long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);

//正常情况

if (lastOffset >= 0) {

result = lastOffset;

}

//第一次拉取文件

else if (-1 == lastOffset) {

if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

result = 0L;

} else {

try {

result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);

} catch (MQClientException e) {

result = -1;

}

}

} else {

//返回-1 表示存储了错误的数值

result = -1;

}

break;

}

case CONSUME_FROM_FIRST_OFFSET: {

//从头开始消费

long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);

if (lastOffset >= 0) {

//正常

result = lastOffset;

} else if (-1 == lastOffset) {

//第一次

result = 0L;

} else {

result = -1;

}

break;

}

case CONSUME_FROM_TIMESTAMP: {

//获取到最后一次消费进度

long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);

if (lastOffset >= 0) {

//正常

result = lastOffset;

} else if (-1 == lastOffset) {

//第一次

//如果支持重试

if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

try {

result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);

} catch (MQClientException e) {

result = -1;

}

} else {

try {

//获取消费者气动阀时间

long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),

UtilAll.YYYYMMDDHHMMSS).getTime();

//搜索offset

result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);

} catch (MQClientException e) {

result = -1;

}

}

} else {

result = -1;

}

break;

}

接着往下说,

就是组成完成PullRequest之后,我们会将其放入PullRequestList

即函数

//分发PullRequest

this.dispatchPullRequest(pullRequestList);

@Override

public void dispatchPullRequest(List<PullRequest> pullRequestList) {

for (PullRequest pullRequest : pullRequestList) {

this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);

log.info(“doRebalance, {}, add a new pull request {}”, consumerGroup, pullRequest);

}

}

将PullRequest放入到PullMessageService中,然后唤醒PullMessageService线程

PullRequest对象在什么时候创建并加入到PullRequestQueue

答案是RebalanceService每隔20S对消费者订阅的主题进行一次队列的重新分配,每一次分配都会获取主题的所有队列,然后如果有新加入的队列,则创建一个新的PullRequest对象

对于第二个问题

集群中多个消费者如何负载主题下的多个消费队列的,如果有新的消费者加入,如何重新分配

首先从Broker中查询出所以的消费者,并且对消费队列,消费者列表进行排序,新加入的消费者就会在队列重新分配的时候进行分配消费了

图片

发表评论

邮箱地址不会被公开。 必填项已用*标注