上一届说了,ConsumeQueue IndexFile 是基于CommitLog文件构建的,当消息生产者提交的消息存储在commitLog之后,就需要考虑如何将commitLog的数据转换为对应的ConsumeQueue和IndexFile了

RMQ是用一个新的异步线程来准时的从CommitLog转发的

这个线程被称为ReputMessageService

我们先从开启线程开始看起,看MessageStore如何启动的

在DefaultMessageStore的start中

对应的代码如下

this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);

this.reputMessageService.start();

在启动的时候,启动并初始化一个非常关键的参数

reputFromOffset,其含义为ReputMessageService从那个物理偏移量开始转发消息给ConsumeQueue 和 IndexFile

如果可以转发,就将其设置为commitLog的当前提交指针,如果不可以,就将reputFromOffset设置为commitLog的内存最大偏移量

对应的ReputMessageServcie是一个DefaultMessageStore的内部类

然后再对应的run中,

@Override

public void run() {

DefaultMessageStore.log.info(this.getServiceName() + ” service started”);

while (!this.isStopped()) {

try {

Thread.sleep(1);

this.doReput();

} catch (Exception e) {

DefaultMessageStore.log.warn(this.getServiceName() + ” service has exception. “, e);

}

}

DefaultMessageStore.log.info(this.getServiceName() + ” service end”);

}

此线程每次执行一次任务推送,然后休息1毫秒,继续尝试推送消息到小队列和索引文件

在其中,核心的推送在doPout函数之中

if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()

&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {

break;

}

首先进行校验,如果提交的物理偏移量不符合转发规范,那么就停止循环

SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

首先是从commit中获取到从提交偏移量到现在所有的数据

然后进行循环

for (int readSize = 0; readSize < result.getSize() && doNext; ) {

从ByteBuffer中读取对应的消息,一次一条

然后校验其是否MsgSize大于0

然后进行相关的转发工作,代码如下

//获取一条消息

DispatchRequest dispatchRequest =

DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

//然后进行校验消息大小是否正确

int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

if (dispatchRequest.isSuccess()) {

//的确有消息

if (size > 0) {

//进行转发

DefaultMessageStore.this.doDispatch(dispatchRequest);

然后调用

DefaultMessageStore.this.doDispatch(dispatchRequest);

最终走到的是

CommiLogDisptacherBuildConsumeQueue和 CommitLogDispatcherBuildIndex

我们先说一下进行重构建这个请求对应数据结构的核心类库

主要是一些核心的属性

private final String topic;

主题名称

private final int queueId;

消息队列Id

private final long commitLogOffset;

物理偏移量

private int msgSize;

消息长度

private final long storeTimestamp;

存储时间戳

private final long consumeQueueOffset;

消息队列偏移量

private final Map<String, String> propertiesMap;

消息属性

先说一下,如何构建ConsumeQueue的

在ConsumeQueue中,我们最终走到的是

DefaultMessageStore#putMessagePositionInfo

然后进行查找ConsumeQueue,这一步利用的消息主题和队列Id,这一步的操作比较简单,是利用的消费主题对应着下面有不同的消费队列的目录,其中有对应的ConsumeQueue

之后调用

ConsumeQueue#putMessagePositionInfoWrapper,

在这里进行存储一些不重要的东西到ConsumeQueueExt

在Wrapper中,继续调用,直到putMessagePositionInfo

在往下走的时候,发现内部将

消息偏移量 消息长度 tagscode写入到了ByteBuffer中

//初始化ByteBuffer

this.byteBufferIndex.flip();

this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);

this.byteBufferIndex.putLong(offset);

this.byteBufferIndex.putInt(size);

this.byteBufferIndex.putLong(tagsCode);

然后计算对应的物理地址

追加的方式写入到对应的MappedFile中

consumeQueue会异步的进行刷盘

之后是根据消息来更新Index文件

其实现类是CommitLogDispatcherBuildIndex

在其中调用了IndexService#buildIndex()

//首先获取IndexFile

IndexFile indexFile = retryGetAndCreateIndexFile();

if (indexFile != null) {

//获取到最大的物理偏移量

long endPhyOffset = indexFile.getEndPhyOffset();

DispatchRequest msg = req;

String topic = msg.getTopic();

String keys = msg.getKeys();

if (msg.getCommitLogOffset() < endPhyOffset) {

//小于了索引文件中的物理偏移量,重复数据,忽略本次索引构建

return;

}

之后就是相关的添加操作

if (req.getUniqKey() != null) {

indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));

if (indexFile == null) {

log.error(“putKey error commitlog {} uniqkey {}”, req.getCommitLogOffset(), req.getUniqKey());

return;

}

}

如果唯一键不为空,就添加到Hash索引中,加速根据唯一键检索消息

发表评论

邮箱地址不会被公开。 必填项已用*标注