那么我们按照上述的步骤,去走对应的源码
1.拉取请求的封装
整体入口在DefaultMQPushConsumerImpl中的pullMessage中
//获取到ProcessQueue
final ProcessQueue processQueue = pullRequest.getProcessQueue(); //如果没有丢弃这个队列 if (processQueue.isDropped()) { log.info(“the pull request[{}] is dropped.”, pullRequest.toString()); return; } //进行设置这个队列的最后一次拉取时间戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); |
//如果已经此消费者被挂起,延迟1s后再次放入队列中
if (this.isPause()) { log.warn(“consumer was paused, execute pull request later. instanceName={}, group={}”, this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } |
//获取消息总数
long cachedMessageCount = processQueue.getMsgCount().get(); //类似的获取消息数量,然后算出个数 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); //如果处理的消息条数超过了pullThresholdForQueue之后,放弃本次的拉取 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { //延迟50ms后放入队列 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( “the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}”, this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } //同上的计算 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( “the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}”, this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } |
if (!this.consumeOrderly) {
//获取到偏移量之间的间距,不能超过maxSpan //并且进行输出 //避免一条消息阻塞,导致消息进度无法前进,大量重复消息消费 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( “the queue’s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}”, processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } } |
//拉取主题订阅信息,如果是空的,那么就结束本次消息拉去
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { //延迟拉取3s this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn(“find the consumer’s subscription failed, {}”, pullRequest); return; } |
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L; //获取提交偏移量,因为只有集群的是保存在本地的 if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } |
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { //消息过滤表达式的获取 subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } //拼接sysFlag ,根据不同的条件是否满足,来进行拼接为一个int int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); |
try {
//调用pullKernelImpl与服务器端进行交互 this.pullAPIWrapper.pullKernelImpl( //获取到消息队列 pullRequest.getMessageQueue(), //消息表达式 subExpression, //消息表达式类型 subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), //消息偏移量 pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), //系统标记 sysFlag, //当前MessageQueue的消息进度 commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, //回调函数 pullCallback ); } |
上述就是整体的函数调用
分为了消息队列的获取和确认,保证消息处理的性能
构建请求,进行请求交互
从这个MQPushConsumerImpl的函数调用之后,是pullKernellmpl
在这个pullKernellmpl中,是负责和Broker进行交互的,首先需要看的是整个pullKernelImpl的参数
1.在其中,首先获取到Broerk的地址
//根据brokerName和BrokerId,从Instance中获取Broker地址,注意返回的内部属性
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); } |
我们会进入这个findBroker的函数,查看其内部实现以及返回的FindBrokerResult的数据结构
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
//根据节点民称获取到所有的节点Id if (map != null && !map.isEmpty()) { brokerAddr = map.get(brokerId); slave = brokerId != MixAll.MASTER_ID; found = brokerAddr != null; //如果没找到并且是Id是从节点 if (!found && slave) { //从新获取 brokerAddr = map.get(brokerId + 1); found = brokerAddr != null; } if (!found && !onlyThisBroker) { Entry<Long, String> entry = map.entrySet().iterator().next(); brokerAddr = entry.getValue(); slave = entry.getKey() != MixAll.MASTER_ID; found = true; } } //返回数据结构,包含Broker地址,是否是从节点 Broker版本 if (found) { return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); } |
接下来组装对应的请求头
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); |
然后进行实际的发送客户端的发送
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { //尝试替换brokerAddr brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); } //实际利用MqClient发送pullRequest PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); |
到这里,基本的请求消息就封装完成了