书接上文
因为RMQ并没有真正的实现消息的推模式,而消息者主动向消息服务器发送拉取消息
如果消息消费者发送了消息拉去的请求,到达了服务端,没有开启长轮询机制,那么会在服务器端等待一段时间之后再次拉取,如果再次拉取没有结果,则返回一个PULL_NOT_FOUND 消息不存在给消费端,如果开启了长轮询,则RMQ则会每5s检查一次消息是否科大,同时一旦有消息达到就立刻通知线程再次验证新消息是否是自己感兴趣的消息,如果符合拉取的,则从commitlog中拉取返回给客户端,而这个超时时间,由客户端定义
而长轮询是否开启,则是在Broker端配置的longPollingEnable为true来开启
我们首先看下,在消息拉取的时候, 如果没有找到消息的时候处理逻辑
位于PullMessageProcessor#的processRequest
case ResponseCode.PULL_NOT_FOUND:
//查看是否挂起 if (brokerAllowSuspend && hasSuspendFlag) { //请求带来的过期时间 long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { //是否支持长轮询 pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); //创建一个新的PullRequest PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); //放入等待一段时间 this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; } |
在上面中,根据是否开启长轮询来决定挂起的方式,如果支持则创建PullRequest
brokerAllowSuspend
就是来判断是否可以再次放入的
再往下,则是看消息拉取线程 PullRequestHoldService线程\
其内部由两个线程共同维护
PullRequestHoldService和DefaultMessageStore#ReputMessageService
共同维护
首先是看PullRequestHoldService线程
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
//通过topic和队列确定一个唯一RequestTable String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { //一个ManyPullRequest是一个队列 mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } mpr.addPullRequest(pullRequest); } |
首先是构建一个key,利用这个key确定一个队列
这个队列内部,持有了一个PullRequest的队列,表示一个主题队列的累加任务
while (!this.isStopped()) {
try { //确定是不是长轮询 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { //然后先进行等待 this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); //核心逻辑 this.checkHoldRequest(); |
然后进行遍历获取
//检查等待的请求
private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { //遍历任务拉取表 String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); //根据主题和队列获取到其最大偏移量 final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { //有新的消息到了的话,就进行通知 this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error(“check hold request failed. topic={}, queueId={}”, topic, queueId, e); } } } } |
在核心处理流程中,我们进行相关处理返回
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { //re build这个key String key = this.buildKey(topic, queueId); //获取Request的队列 ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { //获取到PullRequest的列表 List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { //获取到每一个请求的offset long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); } //检验完成后,获取到Offset if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); // match by bit map, need eval again when properties is not null. if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { //消息返回给客户端 this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error(“execute request when wakeup failed.”, e); } continue; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { //超时的处理流程,直接返回 try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error(“execute request when wakeup failed.”, e); } continue; } |
在executeRequestWhenWakeup,则是真正的去拉取消息,封装Response,也就是将其ResponseCode进行设置的地方
之前还说过,可以将PullRquest唤醒的地方,还有一个DefaultMessageStore的ReputMessageService
其之前说过,负责将消息转发到ConsumeQueue IndexFile,以及长轮询的作用
这一次主要是观察其对于长轮询的相关代码
在其中
//此处进行唤醒Request
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { //通知Listener,消息到达了 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } |
在这一步里面,直接通知Listener 消息到达
在实际的实现者NotifyMessageArrivingListener
在其中,
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); } |
还是走到了pullRequestHoldSerivce的MessageArriving中了唤醒了
这一样就是一个长轮询的逻辑