在Broker中,我们根据Header中的RequestCode获取到注册的处理器

this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);

在处理函数processRequest中

@Override

public RemotingCommand processRequest(final ChannelHandlerContext ctx,

RemotingCommand request) throws RemotingCommandException {

return this.processRequest(ctx.channel(), request, true);

}

进行消息查找

final GetMessageResult getMessageResult =

this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),

requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

在这里面进行获取消息

group 消息组名称

topic 主题名称

queueId 队列ID

queueOffset待拉取偏移量

maxMsgNums最大拉取消息条数

messageFilter 消息过滤器

首先初始化消息变量

GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;

long nextBeginOffset = offset;

long minOffset = 0;

long maxOffset = 0;

//创建一个新的Result

GetMessageResult getResult = new GetMessageResult();

//获取最大偏移量

final long maxOffsetPy = this.commitLog.getMaxOffset();

上面分别是 待查找的偏移量 最小偏移量 最大偏移量

//根据Queue初始化偏移量

minOffset = consumeQueue.getMinOffsetInQueue();

maxOffset = consumeQueue.getMaxOffsetInQueue();

//根据状态以及下一次拉取的偏移量

if (maxOffset == 0) {

status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;

nextBeginOffset = nextOffsetCorrection(offset, 0);

} else if (offset < minOffset) {

status = GetMessageStatus.OFFSET_TOO_SMALL;

nextBeginOffset = nextOffsetCorrection(offset, minOffset);

} else if (offset == maxOffset) {

status = GetMessageStatus.OFFSET_OVERFLOW_ONE;

nextBeginOffset = nextOffsetCorrection(offset, offset);

} else if (offset > maxOffset) {

status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;

if (0 == minOffset) {

nextBeginOffset = nextOffsetCorrection(offset, minOffset);

} else {

nextBeginOffset = nextOffsetCorrection(offset, maxOffset);

}

}

如果maxOffset为0,那么 设置拉取结果为NO_MESSAGE_IN_QUEUE ,而且下一次的拉取偏移量为主节点的话,拉取偏移量为0

如果是从节点,看offsetCheckInSlave为true,设置下一次偏移量为0

如果offset小于minoffset,说明待拉取的小于队列起始时间,设置为OFFSET_TOO_SMALL ,如果为Borker为主节点,Broker为从节点且offsetCheckInSlave为true,下次偏移量为队列最小偏移量,不然就还是offset

如果相等,那么偏移量等于最大偏移量,那么下次还是这个拉取偏移量

如果大于最大偏移量,那么说明已经越界了,尝试使用最大或者最小的偏移量进行纠正

完成上述流程进行下一步的拉取,先概述一下,直接拉取32条消息

这一点就不赘述了

getResult.setStatus(status);

getResult.setNextBeginOffset(nextBeginOffset);

getResult.setMaxOffset(maxOffset);

getResult.setMinOffset(minOffset);

返回的时候,设置下一次拉取的偏移量 和 最大偏移量 以及最小偏移量

/获取到消息返回

if (getMessageResult != null) {

//设置下一次的拉取偏移量

response.setRemark(getMessageResult.getStatus().name());

responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());

responseHeader.setMinOffset(getMessageResult.getMinOffset());

responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

//设置下一次的brokerId

if (getMessageResult.isSuggestPullingFromSlave()) {

responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());

} else {

responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);

}

接下来进行状态的转换

//设置状态,进行状态转换

switch (getMessageResult.getStatus()) {

case FOUND:

response.setCode(ResponseCode.SUCCESS);

break;

case MESSAGE_WAS_REMOVING:

response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);

break;

case NO_MATCHED_LOGIC_QUEUE:

case NO_MESSAGE_IN_QUEUE:

if (0 != requestHeader.getQueueOffset()) {

response.setCode(ResponseCode.PULL_OFFSET_MOVED);

// XXX: warn and notify me

log.info(“the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}”,

requestHeader.getQueueOffset(),

getMessageResult.getNextBeginOffset(),

requestHeader.getTopic(),

requestHeader.getQueueId(),

requestHeader.getConsumerGroup()

);

} else {

response.setCode(ResponseCode.PULL_NOT_FOUND);

}

break;

case NO_MATCHED_MESSAGE:

response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);

break;

case OFFSET_FOUND_NULL:

response.setCode(ResponseCode.PULL_NOT_FOUND);

break;

case OFFSET_OVERFLOW_BADLY:

response.setCode(ResponseCode.PULL_OFFSET_MOVED);

// XXX: warn and notify me

log.info(“the request offset: {} over flow badly, broker max offset: {}, consumer: {}”,

requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());

break;

case OFFSET_OVERFLOW_ONE:

response.setCode(ResponseCode.PULL_NOT_FOUND);

break;

case OFFSET_TOO_SMALL:

response.setCode(ResponseCode.PULL_OFFSET_MOVED);

log.info(“the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}”,

requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),

getMessageResult.getMinOffset(), channel.remoteAddress());

break;

default:

assert false;

break;

}

接下来进行返回,那么得到返回的客户端,需要就返回的状态来进行校验偏移量

发表评论

邮箱地址不会被公开。 必填项已用*标注