在pullMessage函数中,会根据是否是异步来调用不同的子函数

比如在异步的子函数中 会在返回数据之后分别调用callback的onSuccess和onException

try {

PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);

assert pullResult != null;

//拉取完成,返回的入口

//分别走成功和失败

pullCallback.onSuccess(pullResult);

} catch (Exception e) {

pullCallback.onException(e);

}

这个pullCallback在DefaultMQPushConsumerImpl中创建

在进入onSuccess之前,还有一步操作,就是转换ResponseCode

switch (response.getCode()) {

case ResponseCode.SUCCESS:

pullStatus = PullStatus.FOUND;

break;

case ResponseCode.PULL_NOT_FOUND:

pullStatus = PullStatus.NO_NEW_MSG;

break;

case ResponseCode.PULL_RETRY_IMMEDIATELY:

pullStatus = PullStatus.NO_MATCHED_MSG;

break;

case ResponseCode.PULL_OFFSET_MOVED:

pullStatus = PullStatus.OFFSET_ILLEGAL;

break;

在onSuccess中,第一步进行Result的解析

PullAPIWrapper中的processPullResult进行了返回值的解析

//前置解析

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,

final SubscriptionData subscriptionData) {

PullResultExt pullResultExt = (PullResultExt) pullResult;

this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

if (PullStatus.FOUND == pullResult.getPullStatus()) {

ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());

List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

此函数中,还涉及到了消息的过滤

然后在callback中,如果返回的结果是Found

那么就将消息存入到ProcessQueue中,将拉取到的消息提交到ConsumerMessageService中供消费者消费

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(

pullResult.getMsgFoundList(),

processQueue,

pullRequest.getMessageQueue(),

dispatchToConsume);

具体的消费细节无需我们关注

在放置完成,我们需要根据设置的间隔,判断是否需要等待一定时间进行放入

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,

DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());

} else {

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

}

然后,我们看一下,如果返回的状态不是FOUND的话,应该如何矫正偏移量

如果返回的是NO_NEW_MSG或者NO_MATCHED_MSG

那么直接使用服务器返回的偏移量进行下一次的拉取

NO_NEW_MSG对应的是OFFSET_FOUND_NULL,OFFSET_OVERFLOW_ONE

OFFSET_OVERFLOW_ONE代表着是消息队列中最大的偏移量,如果有新的消息达到,会创建一个新的CQ文件,

如果是FOUND_NULL,那么就是根据去下一个CQ文件中查询,下一次的偏移量就是offset+(一个CQ文件的Size)

如果是OFFSET_ILLEGAL的话,意味着拉取偏移量违法,整体的处理如下

//设置这个ProcessQueue为丢弃

pullRequest.getProcessQueue().setDropped(true);

DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override

public void run() {

try {

DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),

pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn(“fix the pull request offset, {}”, pullRequest);

} catch (Throwable e) {

log.error(“executeTaskLater Exception”, e);

}

}

}, 10000);

在其中,尝试更新了Offset

然后将ProcessQueue从rebalance中移除了,等待下一次的拉取

在OFFSET_ILLEGAL对应的Response中的MATCHED_LOGIC_QUEUE,NO_MESSAGE_IN_QUEUE,OFFSET_OVERFLOW_BADLY,OFFSET_TOO_SMALL

一般返回的矫正的数值为原offset的,保证消息的正确性

发表评论

邮箱地址不会被公开。 必填项已用*标注