那么我们按照上述的步骤,去走对应的源码

1.拉取请求的封装

整体入口在DefaultMQPushConsumerImpl中的pullMessage中

//获取到ProcessQueue

final ProcessQueue processQueue = pullRequest.getProcessQueue();

//如果没有丢弃这个队列

if (processQueue.isDropped()) {

log.info(“the pull request[{}] is dropped.”, pullRequest.toString());

return;

}

//进行设置这个队列的最后一次拉取时间戳

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

//如果已经此消费者被挂起,延迟1s后再次放入队列中

if (this.isPause()) {

log.warn(“consumer was paused, execute pull request later. instanceName={}, group={}”, this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);

return;

}

//获取消息总数

long cachedMessageCount = processQueue.getMsgCount().get();

//类似的获取消息数量,然后算出个数

long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

//如果处理的消息条数超过了pullThresholdForQueue之后,放弃本次的拉取

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {

//延迟50ms后放入队列

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);

if ((queueFlowControlTimes++ % 1000) == 0) {

log.warn(

“the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}”,

this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);

}

return;

}

//同上的计算

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);

if ((queueFlowControlTimes++ % 1000) == 0) {

log.warn(

“the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}”,

this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);

}

return;

}

if (!this.consumeOrderly) {

//获取到偏移量之间的间距,不能超过maxSpan

//并且进行输出

//避免一条消息阻塞,导致消息进度无法前进,大量重复消息消费

if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);

if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {

log.warn(

“the queue’s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}”,

processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),

pullRequest, queueMaxSpanFlowControlTimes);

}

return;

}

}

//拉取主题订阅信息,如果是空的,那么就结束本次消息拉去

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());

if (null == subscriptionData) {

//延迟拉取3s

this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);

log.warn(“find the consumer’s subscription failed, {}”, pullRequest);

return;

}

boolean commitOffsetEnable = false;

long commitOffsetValue = 0L;

//获取提交偏移量,因为只有集群的是保存在本地的

if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {

commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);

if (commitOffsetValue > 0) {

commitOffsetEnable = true;

}

}

if (sd != null) {

if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {

//消息过滤表达式的获取

subExpression = sd.getSubString();

}

classFilter = sd.isClassFilterMode();

}

//拼接sysFlag ,根据不同的条件是否满足,来进行拼接为一个int

int sysFlag = PullSysFlag.buildSysFlag(

commitOffsetEnable, // commitOffset

true, // suspend

subExpression != null, // subscription

classFilter // class filter

);

try {

//调用pullKernelImpl与服务器端进行交互

this.pullAPIWrapper.pullKernelImpl(

//获取到消息队列

pullRequest.getMessageQueue(),

//消息表达式

subExpression,

//消息表达式类型

subscriptionData.getExpressionType(),

subscriptionData.getSubVersion(),

//消息偏移量

pullRequest.getNextOffset(),

this.defaultMQPushConsumer.getPullBatchSize(),

//系统标记

sysFlag,

//当前MessageQueue的消息进度

commitOffsetValue,

BROKER_SUSPEND_MAX_TIME_MILLIS,

CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,

CommunicationMode.ASYNC,

//回调函数

pullCallback

);

}

上述就是整体的函数调用

分为了消息队列的获取和确认,保证消息处理的性能

构建请求,进行请求交互

从这个MQPushConsumerImpl的函数调用之后,是pullKernellmpl

在这个pullKernellmpl中,是负责和Broker进行交互的,首先需要看的是整个pullKernelImpl的参数

1.在其中,首先获取到Broerk的地址

//根据brokerName和BrokerId,从Instance中获取Broker地址,注意返回的内部属性

FindBrokerResult findBrokerResult =

this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),

this.recalculatePullFromWhichNode(mq), false);

if (null == findBrokerResult) {

this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());

findBrokerResult =

this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),

this.recalculatePullFromWhichNode(mq), false);

}

我们会进入这个findBroker的函数,查看其内部实现以及返回的FindBrokerResult的数据结构

HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);

//根据节点民称获取到所有的节点Id

if (map != null && !map.isEmpty()) {

brokerAddr = map.get(brokerId);

slave = brokerId != MixAll.MASTER_ID;

found = brokerAddr != null;

//如果没找到并且是Id是从节点

if (!found && slave) {

//从新获取

brokerAddr = map.get(brokerId + 1);

found = brokerAddr != null;

}

if (!found && !onlyThisBroker) {

Entry<Long, String> entry = map.entrySet().iterator().next();

brokerAddr = entry.getValue();

slave = entry.getKey() != MixAll.MASTER_ID;

found = true;

}

}

//返回数据结构,包含Broker地址,是否是从节点 Broker版本

if (found) {

return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));

}

接下来组装对应的请求头

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();

requestHeader.setConsumerGroup(this.consumerGroup);

requestHeader.setTopic(mq.getTopic());

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setQueueOffset(offset);

requestHeader.setMaxMsgNums(maxNums);

requestHeader.setSysFlag(sysFlagInner);

requestHeader.setCommitOffset(commitOffset);

requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);

requestHeader.setSubscription(subExpression);

requestHeader.setSubVersion(subVersion);

requestHeader.setExpressionType(expressionType);

然后进行实际的发送客户端的发送

String brokerAddr = findBrokerResult.getBrokerAddr();

if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {

//尝试替换brokerAddr

brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);

}

//实际利用MqClient发送pullRequest

PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(

brokerAddr,

requestHeader,

timeoutMillis,

communicationMode,

pullCallback);

到这里,基本的请求消息就封装完成了

发表评论

邮箱地址不会被公开。 必填项已用*标注