RMQ的实际存储地址是 HOME下的store,主要存储的文件包含

commitlog,消息存储目录

config,配置 包含 主体消费过滤信息 集群消费模式消息消费进度

延迟消息队列拉取进度 消息消费组配置信息

topics.json 配置属性

consumequeue:消息消费队列存储目录

index 消息索引文件存储目录

abort 一个用于检测是否是正常关闭的文件

checkpoint 文件检测点

首先,最重要的,就是commitlog

commit的文件组成结构如下

前四个字节存储消息的总长度,后面存储消息,和mappendFile一致

对于commitlog的配置,可以在borker配置文件中设置storePathRootDir中改变路径,对于大小,可以通过broker配置文件改变默认大小

常见的API如下

public long getMinOffset() {

MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();

if (mappedFile != null) {

if (mappedFile.isAvailable()) {

return mappedFile.getFileFromOffset();

} else {

return this.rollNextFile(mappedFile.getFileFromOffset());

}

}

return -1;

}

获取最小的偏移量,首先是目录下的第一个文件,如果可用,则返回文件的起始偏移量,不然就返回下一个文件起始偏移量

public long rollNextFile(final long offset) {

int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();

return offset + mappedFileSize – offset % mappedFileSize;

}

获取了偏移量

最后呢,则是根据消息偏移量和消息长度来查找消息

public SelectMappedBufferResult getMessage(final long offset, final int size) {

//获取到对应的物理文件

int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();

MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);

if (mappedFile != null) {

//offset和文件大小取余获取到文件内的偏移量

int pos = (int) (offset % mappedFileSize);

//根据偏移量和长度来读取消息

return mappedFile.selectMappedBuffer(pos, size);

}

return null;

}

然后是ConsumeQueue文件

RMQ基于的是主题订阅模式来实现的消息消费,消费者关心的是一个主题下的消息,但是由于commitlog存储消息不区分主题,所以如果查找消息的时候直接遍历查找commitlog中的消息,效率将极其低下,为了方便按照消息主题来进行查找消息的需求,设计了ConsumeQueue文件,其实就是一个索引文件,只存储相关的偏移量,内部目录是按照主题为文件夹名的一级目录,下面还有一级是以消息队列为文件名的

CosumeQueue的存储格式基本如下

首先是8字节的commitlog offset 然后是4字节的size,存储的commitlog的消息长度,最后是8字节的tags hashcode,存储的是

单个ComsumeQueue文件中存有30万个条目,所以单个大小为 20 * 300000字节

这其实本质上就是commitlog文件的索引文件,构建的方式是在消息到达commitlog之后,由专门的线程生产消息转发任务,构建消息消费队列文件和索引文件

那么利用ConsumeQueue查找消息的api如下

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {

int mappedFileSize = this.mappedFileSize;

long offset = startIndex * CQ_STORE_UNIT_SIZE;

if (offset >= this.getMinLogicOffset()) {

MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);

if (mappedFile != null) {

SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));

return result;

}

}

return null;

}

在这个api中,首先将startIndex 乘以 20 获取到了offset

如果offset大于最小的逻辑偏移量,那么就根据偏移量定位到对应的物理文件,如果不是文件存在那么就取模获取到文件内容,从偏移量连续读取即可

其次的api是根据消息存储时间来查找具体偏移量的算法

public long getOffsetInQueueByTime(final long timestamp) {

//首先是根据时间戳定位到具体实现,就是找到第一个更新时间比传入的时间戳更大的文件

MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);

//如果不为空

if (mappedFile != null) {

//利用二分法加速查询

long offset = 0;

//计算最低查找偏移量,是否消息队列中最小偏移量大于该文件的起始偏移量,返回之差还是0

int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset – mappedFile.getFileFromOffset()) : 0;

int high = 0;

int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;

long leftIndexValue = -1L, rightIndexValue = -1L;

long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();

//获取一个完整的ByteBuffer

SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);

if (null != sbr) {

ByteBuffer byteBuffer = sbr.getByteBuffer();

//高位先设置为整体ByteBuffer减去20的位置

high = byteBuffer.limit() – CQ_STORE_UNIT_SIZE;

try {

while (high >= low) {

//获取中位

midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;

byteBuffer.position(midOffset);

//尝试获取物理偏移量

long phyOffset = byteBuffer.getLong();

int size = byteBuffer.getInt();

if (phyOffset < minPhysicOffset) {

//物理偏移量小于最小物理偏移量,说明待查找的物理偏移量大于midOffSet,需要继续向右折半查找

low = midOffset + CQ_STORE_UNIT_SIZE;

leftOffset = midOffset;

continue;

}

//走到这一步,说明偏移量已经在范围内了,进行判断,获取存储时间戳

long storeTime =

this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);

if (storeTime < 0) {

//无效消息

return 0;

} else if (storeTime == timestamp) {

//bingo 命中

targetOffset = midOffset;

break;

} else if (storeTime > timestamp) {

//时间戳大于查找的时间戳

//设置high为midoffset

high = midOffset – CQ_STORE_UNIT_SIZE;

rightOffset = midOffset;

rightIndexValue = storeTime;

} else {

//时间戳小于查找的时间戳

//设置low为midoffset

low = midOffset + CQ_STORE_UNIT_SIZE;

leftOffset = midOffset;

leftIndexValue = storeTime;

}

}

//跳出循环之后

if (targetOffset != -1) {

//如果targetOffset不是 -1的话

offset = targetOffset;

} else {

//接下来是根据leftIndexValue or rightIndexValue不为空来判断返回的是

//左边最靠近的还是右边最靠近的

if (leftIndexValue == -1) {

offset = rightOffset;

} else if (rightIndexValue == -1) {

offset = leftOffset;

} else {

offset =

Math.abs(timestamp – leftIndexValue) > Math.abs(timestamp

– rightIndexValue) ? rightOffset : leftOffset;

}

}

return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;

} finally {

sbr.release();

}

}

}

return 0;

}

最后是获取下一个文件的起始偏移量

public long rollNextFile(final long index) {

//获取这个文件长度

int mappedFileSize = this.mappedFileSize;

//一个文件的最大偏移量

int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;

//下一个文件的起始偏移量

return index + totalUnitsInFile – index % totalUnitsInFile;

}

最后我们说一下Index 索引文件

IndexFile是RMQ专门为了消息订阅而构建的索引文件,提高根据主题检索消息文件的速度

RMQ专门引入了Hash索引机制来为消息建立索引,索引文件的数据结构基本如下

IndexFile包含了IndexHeader Hash槽 Index条目这几个数据结构

IndexHeader是表明这个IndexFile统计信息的数据结构,结构主要有

beginTimeStamp 索引文件中最早存储时间,

endTimeStamp 索引文件中最大存储时间,上述两个由init时候确定

beginPhyoffset 索引文件中包含消息的最小物理偏移量,

endPhyoffset 索引文件最大物理偏移量 上述两个都是commitlog的偏移量

indexCount Index条目已经使用的个数,Index条目按照顺序存储

然后是Hash槽

Hash槽中的是hashcode中最新Index的索引

然后是Index条目,Index条目中维护了

key的hashcode用于校验

phyoffset 消息对应的物理偏移量

timedif 该消息存储时间与第一条消息时间戳的差值

preIndexNo 这个条目前一条记录的Index索引

那么在内存中,如何存为Map<String 消息索引,long 消息物理偏移量> 而且如何反过来找到的

RMQ将消息索引键以及消息偏移量映射关系存入到IndexFile的方法为 putKey

putKey(final String key /*消息索引*/, final long phyOffset /*物理偏移量*/, final long storeTimestamp /*存储时间*/)

在函数的内部执行中

首先我们判断indexCount是否小于 indexNum

indexNum是最大允许条目,大于就直接放入失败

然后就是索引文件没有写满,则算出key的hashCode

int keyHash = indexKeyHashMethod(key);

根据keyhash定位到hashcode对应的hash槽下标,这一步是利用了取余操作

int slotPos = keyHash % this.hashSlotNum;

然后获取到hash槽的物理地址

获取的计算方式是

hash槽的物理地址 = Index头部 40字节 + (下标 * 每个hash槽的大小,即4字节)

然后读取槽内的数据,如果槽内数据小于0或者大于当前索引文件中的索引条目格式,则设置slotValue为0

if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {

slotValue = invalidIndex;

}

然后计算带存储消息时间戳和第一条消息的时间戳的差值,换成秒

最后进行存储

计算新存储的条目的物理偏移量 = 头部40个字节 加上 (hash槽数量*4字节) + (Index条目数量 * Index条目大小即20个字节)

然后讲hashcode,物理偏移量 消息时间戳偏移量,hash槽的值放进去,最后上一位的slotValue放了进去

//计算了indexFile的长度

int absIndexPos =

IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize

+ this.indexHeader.getIndexCount() * indexSize;

//首先更新了后面的index条目,然后更新了前面的hash槽

this.mappedByteBuffer.putInt(absIndexPos, keyHash);

this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);

this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);

//将之前的保存在最后4位,方便链式查询

this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

//这一步,更新前面的头

this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

利用一种类似链表的数据结构,来将其进行顺序的绑定

而且只存储HashCode和物理偏移量,方便检索和ID你给我IE条目

之后是关于索引头的信息更新

this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

//初始化操作

if (this.indexHeader.getIndexCount() <= 1) {

this.indexHeader.setBeginPhyOffset(phyOffset);

this.indexHeader.setBeginTimestamp(storeTimestamp);

}

//比起旧版,还更新了hash槽的数量

if (invalidIndex == slotValue) {

this.indexHeader.incHashSlotCount();

}

this.indexHeader.incIndexCount();

this.indexHeader.setEndPhyOffset(phyOffset);

this.indexHeader.setEndTimestamp(storeTimestamp);

更新时间戳 条目等信息

下一个api是关于根据索引Key查找对应的消息

实现方法是selectPhyOffset

函数主要包含的是

public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,

final long begin, final long end, boolean lock) {

传入参数有 查找到的消息物理偏移量

索引Key

最大消息条数

开始时间戳

结束时间戳

首先根据key获取到对应的keyhash,然后进行取余到实际的hash槽下标,然后获取到对应的物理地址

获取方式很简单 IndexHeader + (下标 * 4字节)

int keyHash = indexKeyHashMethod(key);

int slotPos = keyHash % this.hashSlotNum;

//拿到hash槽的物理地址

int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

然后获取对应的Hash槽中存储的数据,如果小于1或者大于当前索引个数,就说明没有HashCode对应的值,直接返回

int slotValue = this.mappedByteBuffer.getInt(absSlotPos);

// if (fileLock != null) {

// fileLock.release();

// fileLock = null;

// }

if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()

|| this.indexHeader.getIndexCount() <= 1) {

然后进行for循环的查找,从最新的Item开始查找

只要查找到的数据大于1且小于最大条目数,那么就继续查找,不然就结束查找

获取的数据在下面,分别是hashcode 偏移量 时间差 上一个条目的Index下标

int absIndexPos =

IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize

+ nextIndexToRead * indexSize;

int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);

long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);

int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

然后检测存储的时间差是不是在时间范围内,且验证过对应的Index索引,索引大于等于1且小于Index条目数,就继续查找,不然就结束流程

if (timeDiff < 0) {

break;

}

timeDiff *= 1000L;

long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;

boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

if (keyHash == keyHashRead && timeMatched) {

phyOffsets.add(phyOffsetRead);

}

最后还有一个不起眼的小文件

checkpoint文件,记录commitLog ConsumeQueue Index文件输盘文件的时间点

基本的数据结构为

commitLog 文件刷盘时间点

logicsMsgTimestamp 消息消费队列文件刷盘时间点

indexMsgTimestamp 索引文件刷盘时间点

发表评论

邮箱地址不会被公开。 必填项已用*标注