在Broker中,我们根据Header中的RequestCode获取到注册的处理器
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); |
在处理函数processRequest中
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { return this.processRequest(ctx.channel(), request, true); } |
进行消息查找
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); |
在这里面进行获取消息
group 消息组名称
topic 主题名称
queueId 队列ID
queueOffset待拉取偏移量
maxMsgNums最大拉取消息条数
messageFilter 消息过滤器
首先初始化消息变量
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset; long minOffset = 0; long maxOffset = 0; //创建一个新的Result GetMessageResult getResult = new GetMessageResult(); //获取最大偏移量 final long maxOffsetPy = this.commitLog.getMaxOffset(); |
上面分别是 待查找的偏移量 最小偏移量 最大偏移量
//根据Queue初始化偏移量
minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); //根据状态以及下一次拉取的偏移量 if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } |
如果maxOffset为0,那么 设置拉取结果为NO_MESSAGE_IN_QUEUE ,而且下一次的拉取偏移量为主节点的话,拉取偏移量为0
如果是从节点,看offsetCheckInSlave为true,设置下一次偏移量为0
如果offset小于minoffset,说明待拉取的小于队列起始时间,设置为OFFSET_TOO_SMALL ,如果为Borker为主节点,Broker为从节点且offsetCheckInSlave为true,下次偏移量为队列最小偏移量,不然就还是offset
如果相等,那么偏移量等于最大偏移量,那么下次还是这个拉取偏移量
如果大于最大偏移量,那么说明已经越界了,尝试使用最大或者最小的偏移量进行纠正
完成上述流程进行下一步的拉取,先概述一下,直接拉取32条消息
这一点就不赘述了
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); |
返回的时候,设置下一次拉取的偏移量 和 最大偏移量 以及最小偏移量
/获取到消息返回
if (getMessageResult != null) { //设置下一次的拉取偏移量 response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); //设置下一次的brokerId if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } |
接下来进行状态的转换
//设置状态,进行状态转换
switch (getMessageResult.getStatus()) { case FOUND: response.setCode(ResponseCode.SUCCESS); break; case MESSAGE_WAS_REMOVING: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break; case NO_MATCHED_LOGIC_QUEUE: case NO_MESSAGE_IN_QUEUE: if (0 != requestHeader.getQueueOffset()) { response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me log.info(“the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}”, requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup() ); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); } break; case NO_MATCHED_MESSAGE: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break; case OFFSET_FOUND_NULL: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_OVERFLOW_BADLY: response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me log.info(“the request offset: {} over flow badly, broker max offset: {}, consumer: {}”, requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()); break; case OFFSET_OVERFLOW_ONE: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); log.info(“the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}”, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress()); break; default: assert false; break; } |
接下来进行返回,那么得到返回的客户端,需要就返回的状态来进行校验偏移量