上一届说了,ConsumeQueue IndexFile 是基于CommitLog文件构建的,当消息生产者提交的消息存储在commitLog之后,就需要考虑如何将commitLog的数据转换为对应的ConsumeQueue和IndexFile了
RMQ是用一个新的异步线程来准时的从CommitLog转发的
这个线程被称为ReputMessageService
我们先从开启线程开始看起,看MessageStore如何启动的
在DefaultMessageStore的start中
对应的代码如下
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start(); |
在启动的时候,启动并初始化一个非常关键的参数
reputFromOffset,其含义为ReputMessageService从那个物理偏移量开始转发消息给ConsumeQueue 和 IndexFile
如果可以转发,就将其设置为commitLog的当前提交指针,如果不可以,就将reputFromOffset设置为commitLog的内存最大偏移量
对应的ReputMessageServcie是一个DefaultMessageStore的内部类
然后再对应的run中,
@Override
public void run() { DefaultMessageStore.log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + ” service has exception. “, e); } } DefaultMessageStore.log.info(this.getServiceName() + ” service end”); } |
此线程每次执行一次任务推送,然后休息1毫秒,继续尝试推送消息到小队列和索引文件
在其中,核心的推送在doPout函数之中
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } |
首先进行校验,如果提交的物理偏移量不符合转发规范,那么就停止循环
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); |
首先是从commit中获取到从提交偏移量到现在所有的数据
然后进行循环
for (int readSize = 0; readSize < result.getSize() && doNext; ) { |
从ByteBuffer中读取对应的消息,一次一条
然后校验其是否MsgSize大于0
然后进行相关的转发工作,代码如下
//获取一条消息
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); //然后进行校验消息大小是否正确 int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { //的确有消息 if (size > 0) { //进行转发 DefaultMessageStore.this.doDispatch(dispatchRequest); |
然后调用
DefaultMessageStore.this.doDispatch(dispatchRequest);
最终走到的是
CommiLogDisptacherBuildConsumeQueue和 CommitLogDispatcherBuildIndex
我们先说一下进行重构建这个请求对应数据结构的核心类库
主要是一些核心的属性
private final String topic;
主题名称
private final int queueId;
消息队列Id
private final long commitLogOffset;
物理偏移量
private int msgSize;
消息长度
private final long storeTimestamp;
存储时间戳
private final long consumeQueueOffset;
消息队列偏移量
private final Map<String, String> propertiesMap;
消息属性
先说一下,如何构建ConsumeQueue的
在ConsumeQueue中,我们最终走到的是
DefaultMessageStore#putMessagePositionInfo
然后进行查找ConsumeQueue,这一步利用的消息主题和队列Id,这一步的操作比较简单,是利用的消费主题对应着下面有不同的消费队列的目录,其中有对应的ConsumeQueue
之后调用
ConsumeQueue#putMessagePositionInfoWrapper,
在这里进行存储一些不重要的东西到ConsumeQueueExt
在Wrapper中,继续调用,直到putMessagePositionInfo
在往下走的时候,发现内部将
消息偏移量 消息长度 tagscode写入到了ByteBuffer中
//初始化ByteBuffer
this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); |
然后计算对应的物理地址
追加的方式写入到对应的MappedFile中
consumeQueue会异步的进行刷盘
之后是根据消息来更新Index文件
其实现类是CommitLogDispatcherBuildIndex
在其中调用了IndexService#buildIndex()
//首先获取IndexFile
IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null) { //获取到最大的物理偏移量 long endPhyOffset = indexFile.getEndPhyOffset(); DispatchRequest msg = req; String topic = msg.getTopic(); String keys = msg.getKeys(); if (msg.getCommitLogOffset() < endPhyOffset) { //小于了索引文件中的物理偏移量,重复数据,忽略本次索引构建 return; } |
之后就是相关的添加操作
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey())); if (indexFile == null) { log.error(“putKey error commitlog {} uniqkey {}”, req.getCommitLogOffset(), req.getUniqKey()); return; } } |
如果唯一键不为空,就添加到Hash索引中,加速根据唯一键检索消息