在pullMessage函数中,会根据是否是异步来调用不同的子函数
比如在异步的子函数中 会在返回数据之后分别调用callback的onSuccess和onException
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); assert pullResult != null; //拉取完成,返回的入口 //分别走成功和失败 pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } |
这个pullCallback在DefaultMQPushConsumerImpl中创建
在进入onSuccess之前,还有一步操作,就是转换ResponseCode
switch (response.getCode()) {
case ResponseCode.SUCCESS: pullStatus = PullStatus.FOUND; break; case ResponseCode.PULL_NOT_FOUND: pullStatus = PullStatus.NO_NEW_MSG; break; case ResponseCode.PULL_RETRY_IMMEDIATELY: pullStatus = PullStatus.NO_MATCHED_MSG; break; case ResponseCode.PULL_OFFSET_MOVED: pullStatus = PullStatus.OFFSET_ILLEGAL; break; |
在onSuccess中,第一步进行Result的解析
PullAPIWrapper中的processPullResult进行了返回值的解析
//前置解析
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); |
此函数中,还涉及到了消息的过滤
然后在callback中,如果返回的结果是Found
那么就将消息存入到ProcessQueue中,将拉取到的消息提交到ConsumerMessageService中供消费者消费
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); |
具体的消费细节无需我们关注
在放置完成,我们需要根据设置的间隔,判断是否需要等待一定时间进行放入
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } |
然后,我们看一下,如果返回的状态不是FOUND的话,应该如何矫正偏移量
如果返回的是NO_NEW_MSG或者NO_MATCHED_MSG
那么直接使用服务器返回的偏移量进行下一次的拉取
NO_NEW_MSG对应的是OFFSET_FOUND_NULL,OFFSET_OVERFLOW_ONE
OFFSET_OVERFLOW_ONE代表着是消息队列中最大的偏移量,如果有新的消息达到,会创建一个新的CQ文件,
如果是FOUND_NULL,那么就是根据去下一个CQ文件中查询,下一次的偏移量就是offset+(一个CQ文件的Size)
如果是OFFSET_ILLEGAL的话,意味着拉取偏移量违法,整体的处理如下
//设置这个ProcessQueue为丢弃
pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn(“fix the pull request offset, {}”, pullRequest); } catch (Throwable e) { log.error(“executeTaskLater Exception”, e); } } }, 10000); |
在其中,尝试更新了Offset
然后将ProcessQueue从rebalance中移除了,等待下一次的拉取
在OFFSET_ILLEGAL对应的Response中的MATCHED_LOGIC_QUEUE,NO_MESSAGE_IN_QUEUE,OFFSET_OVERFLOW_BADLY,OFFSET_TOO_SMALL
一般返回的矫正的数值为原offset的,保证消息的正确性