RMQ的实际存储地址是 HOME下的store,主要存储的文件包含
commitlog,消息存储目录
config,配置 包含 主体消费过滤信息 集群消费模式消息消费进度
延迟消息队列拉取进度 消息消费组配置信息
topics.json 配置属性
consumequeue:消息消费队列存储目录
index 消息索引文件存储目录
abort 一个用于检测是否是正常关闭的文件
checkpoint 文件检测点
首先,最重要的,就是commitlog
commit的文件组成结构如下
前四个字节存储消息的总长度,后面存储消息,和mappendFile一致
对于commitlog的配置,可以在borker配置文件中设置storePathRootDir中改变路径,对于大小,可以通过broker配置文件改变默认大小
常见的API如下
public long getMinOffset() {
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); if (mappedFile != null) { if (mappedFile.isAvailable()) { return mappedFile.getFileFromOffset(); } else { return this.rollNextFile(mappedFile.getFileFromOffset()); } } return -1; } |
获取最小的偏移量,首先是目录下的第一个文件,如果可用,则返回文件的起始偏移量,不然就返回下一个文件起始偏移量
public long rollNextFile(final long offset) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); return offset + mappedFileSize – offset % mappedFileSize; } |
获取了偏移量
最后呢,则是根据消息偏移量和消息长度来查找消息
public SelectMappedBufferResult getMessage(final long offset, final int size) {
//获取到对应的物理文件 int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { //offset和文件大小取余获取到文件内的偏移量 int pos = (int) (offset % mappedFileSize); //根据偏移量和长度来读取消息 return mappedFile.selectMappedBuffer(pos, size); } return null; } |
然后是ConsumeQueue文件
RMQ基于的是主题订阅模式来实现的消息消费,消费者关心的是一个主题下的消息,但是由于commitlog存储消息不区分主题,所以如果查找消息的时候直接遍历查找commitlog中的消息,效率将极其低下,为了方便按照消息主题来进行查找消息的需求,设计了ConsumeQueue文件,其实就是一个索引文件,只存储相关的偏移量,内部目录是按照主题为文件夹名的一级目录,下面还有一级是以消息队列为文件名的
CosumeQueue的存储格式基本如下
首先是8字节的commitlog offset 然后是4字节的size,存储的commitlog的消息长度,最后是8字节的tags hashcode,存储的是
单个ComsumeQueue文件中存有30万个条目,所以单个大小为 20 * 300000字节
这其实本质上就是commitlog文件的索引文件,构建的方式是在消息到达commitlog之后,由专门的线程生产消息转发任务,构建消息消费队列文件和索引文件
那么利用ConsumeQueue查找消息的api如下
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize; long offset = startIndex * CQ_STORE_UNIT_SIZE; if (offset >= this.getMinLogicOffset()) { MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); return result; } } return null; } |
在这个api中,首先将startIndex 乘以 20 获取到了offset
如果offset大于最小的逻辑偏移量,那么就根据偏移量定位到对应的物理文件,如果不是文件存在那么就取模获取到文件内容,从偏移量连续读取即可
其次的api是根据消息存储时间来查找具体偏移量的算法
public long getOffsetInQueueByTime(final long timestamp) {
//首先是根据时间戳定位到具体实现,就是找到第一个更新时间比传入的时间戳更大的文件 MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp); //如果不为空 if (mappedFile != null) { //利用二分法加速查询 long offset = 0; //计算最低查找偏移量,是否消息队列中最小偏移量大于该文件的起始偏移量,返回之差还是0 int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset – mappedFile.getFileFromOffset()) : 0; int high = 0; int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1; long leftIndexValue = -1L, rightIndexValue = -1L; long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset(); //获取一个完整的ByteBuffer SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); if (null != sbr) { ByteBuffer byteBuffer = sbr.getByteBuffer(); //高位先设置为整体ByteBuffer减去20的位置 high = byteBuffer.limit() – CQ_STORE_UNIT_SIZE; try { while (high >= low) { //获取中位 midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE; byteBuffer.position(midOffset); //尝试获取物理偏移量 long phyOffset = byteBuffer.getLong(); int size = byteBuffer.getInt(); if (phyOffset < minPhysicOffset) { //物理偏移量小于最小物理偏移量,说明待查找的物理偏移量大于midOffSet,需要继续向右折半查找 low = midOffset + CQ_STORE_UNIT_SIZE; leftOffset = midOffset; continue; } //走到这一步,说明偏移量已经在范围内了,进行判断,获取存储时间戳 long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); if (storeTime < 0) { //无效消息 return 0; } else if (storeTime == timestamp) { //bingo 命中 targetOffset = midOffset; break; } else if (storeTime > timestamp) { //时间戳大于查找的时间戳 //设置high为midoffset high = midOffset – CQ_STORE_UNIT_SIZE; rightOffset = midOffset; rightIndexValue = storeTime; } else { //时间戳小于查找的时间戳 //设置low为midoffset low = midOffset + CQ_STORE_UNIT_SIZE; leftOffset = midOffset; leftIndexValue = storeTime; } } //跳出循环之后 if (targetOffset != -1) { //如果targetOffset不是 -1的话 offset = targetOffset; } else { //接下来是根据leftIndexValue or rightIndexValue不为空来判断返回的是 //左边最靠近的还是右边最靠近的 if (leftIndexValue == -1) { offset = rightOffset; } else if (rightIndexValue == -1) { offset = leftOffset; } else { offset = Math.abs(timestamp – leftIndexValue) > Math.abs(timestamp – rightIndexValue) ? rightOffset : leftOffset; } } return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE; } finally { sbr.release(); } } } return 0; } |
最后是获取下一个文件的起始偏移量
public long rollNextFile(final long index) {
//获取这个文件长度 int mappedFileSize = this.mappedFileSize; //一个文件的最大偏移量 int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE; //下一个文件的起始偏移量 return index + totalUnitsInFile – index % totalUnitsInFile; } |
最后我们说一下Index 索引文件
IndexFile是RMQ专门为了消息订阅而构建的索引文件,提高根据主题检索消息文件的速度
RMQ专门引入了Hash索引机制来为消息建立索引,索引文件的数据结构基本如下
IndexFile包含了IndexHeader Hash槽 Index条目这几个数据结构
IndexHeader是表明这个IndexFile统计信息的数据结构,结构主要有
beginTimeStamp 索引文件中最早存储时间,
endTimeStamp 索引文件中最大存储时间,上述两个由init时候确定
beginPhyoffset 索引文件中包含消息的最小物理偏移量,
endPhyoffset 索引文件最大物理偏移量 上述两个都是commitlog的偏移量
indexCount Index条目已经使用的个数,Index条目按照顺序存储
然后是Hash槽
Hash槽中的是hashcode中最新Index的索引
然后是Index条目,Index条目中维护了
key的hashcode用于校验
phyoffset 消息对应的物理偏移量
timedif 该消息存储时间与第一条消息时间戳的差值
preIndexNo 这个条目前一条记录的Index索引
那么在内存中,如何存为Map<String 消息索引,long 消息物理偏移量> 而且如何反过来找到的
RMQ将消息索引键以及消息偏移量映射关系存入到IndexFile的方法为 putKey
putKey(final String key /*消息索引*/, final long phyOffset /*物理偏移量*/, final long storeTimestamp /*存储时间*/)
在函数的内部执行中
首先我们判断indexCount是否小于 indexNum
indexNum是最大允许条目,大于就直接放入失败
然后就是索引文件没有写满,则算出key的hashCode
int keyHash = indexKeyHashMethod(key);
根据keyhash定位到hashcode对应的hash槽下标,这一步是利用了取余操作
int slotPos = keyHash % this.hashSlotNum;
然后获取到hash槽的物理地址
获取的计算方式是
hash槽的物理地址 = Index头部 40字节 + (下标 * 每个hash槽的大小,即4字节)
然后读取槽内的数据,如果槽内数据小于0或者大于当前索引文件中的索引条目格式,则设置slotValue为0
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
然后计算带存储消息时间戳和第一条消息的时间戳的差值,换成秒
最后进行存储
计算新存储的条目的物理偏移量 = 头部40个字节 加上 (hash槽数量*4字节) + (Index条目数量 * Index条目大小即20个字节)
然后讲hashcode,物理偏移量 消息时间戳偏移量,hash槽的值放进去,最后上一位的slotValue放了进去
//计算了indexFile的长度
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; //首先更新了后面的index条目,然后更新了前面的hash槽 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); //将之前的保存在最后4位,方便链式查询 this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); //这一步,更新前面的头 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); |
利用一种类似链表的数据结构,来将其进行顺序的绑定
而且只存储HashCode和物理偏移量,方便检索和ID你给我IE条目
之后是关于索引头的信息更新
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//初始化操作 if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } //比起旧版,还更新了hash槽的数量 if (invalidIndex == slotValue) { this.indexHeader.incHashSlotCount(); } this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); |
更新时间戳 条目等信息
下一个api是关于根据索引Key查找对应的消息
实现方法是selectPhyOffset
函数主要包含的是
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) { |
传入参数有 查找到的消息物理偏移量
索引Key
最大消息条数
开始时间戳
结束时间戳
首先根据key获取到对应的keyhash,然后进行取余到实际的hash槽下标,然后获取到对应的物理地址
获取方式很简单 IndexHeader + (下标 * 4字节)
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum; //拿到hash槽的物理地址 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; |
然后获取对应的Hash槽中存储的数据,如果小于1或者大于当前索引个数,就说明没有HashCode对应的值,直接返回
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) { // fileLock.release(); // fileLock = null; // } if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { |
然后进行for循环的查找,从最新的Item开始查找
只要查找到的数据大于1且小于最大条目数,那么就继续查找,不然就结束查找
获取的数据在下面,分别是hashcode 偏移量 时间差 上一个条目的Index下标
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); |
然后检测存储的时间差是不是在时间范围内,且验证过对应的Index索引,索引大于等于1且小于Index条目数,就继续查找,不然就结束流程
if (timeDiff < 0) {
break; } timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); } |
最后还有一个不起眼的小文件
checkpoint文件,记录commitLog ConsumeQueue Index文件输盘文件的时间点
基本的数据结构为
commitLog 文件刷盘时间点
logicsMsgTimestamp 消息消费队列文件刷盘时间点
indexMsgTimestamp 索引文件刷盘时间点