RMQ通过内存映射文件提高IO性能,是零拷贝技术吗?使其无论是CommitLog ConsumeQueue IndexFile,都是来做到固定长度的,并且利用固定长度,保证能找到对应的文件,而且固定长度保证了不会爆栈,文件名就是该文件第一条消息对应的全局物理偏移量
CommitLog文件组织方式如下所示
000000000000000000000000
000000000000001073741824
在RMQ中,被实际封装为了MappendFile MappendFileQueu对象
MappendFile实际对应
而在实际中,MappendFileQueue是MappendFile的管理容器,内部属性如下
String storePath;
存储目录
int mappedFileSize;
文件大小
CopyOnWriteArrayList<MappedFile> mappedFiles
文件集合
AllocateMappedFileService allocateMappedFileService
创建MappedFile的Service
long flushedWhere
表明指针之前所有数据全部持久化
long committedWhere = 0;
大于等于上面的持久化指针,表明提交指针
接下来,我们说一下,根据不同维度来进行查找MappendFile,也就是commitLog
首先是根据时间戳来进行查找MappendFile
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return null; for (int i = 0; i < mfs.length; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; if (mappedFile.getLastModifiedTimestamp() >= timestamp) { return mappedFile; } } return (MappedFile) mfs[mfs.length – 1]; } |
上面的代码中,我们会从列表中的第一个文件开始查找.找到一个最后一次更新时间大于待查找时间戳的文件,不存在,就返回最后一个文件
然后是根据消息偏移量来查找MappendFile
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try { MappedFile firstMappedFile = this.getFirstMappedFile(); MappedFile lastMappedFile = this.getLastMappedFile(); if (firstMappedFile != null && lastMappedFile != null) { if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn(“Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}”, offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, this.mappedFiles.size()); } else { int index = (int) ((offset / this.mappedFileSize) – (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error(“findMappedFileByOffset Exception”, e); } return null; } |
由于不可能将所有的文件都存于内存中,所以RMQ支持了定时删除存储文件的策略
根据offset定位MappendFile的算法为 (int)((offset / this.mappendFileSize – mappendFile.getFileFromOffset() / this.MappendFileSize))
获取当前的最小偏移量
public long getMinOffset() {
if (!this.mappedFiles.isEmpty()) { try { return this.mappedFiles.get(0).getFileFromOffset(); } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error(“getMinOffset has exception.”, e); } } return -1; } |
返回当前第一个MappendFile的偏移量
获取存储文件的最大偏移量
public long getMaxOffset() {
MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { return mappedFile.getFileFromOffset() + mappedFile.getReadPosition(); } return 0; } |
返回最后一个文件的偏移量加上文件的读指针
获取了当前的写指针,返回最后一个文件的fileFromOffset加上当前文件的写指针
public long getMaxWrotePosition() {
MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); } return 0; } |
然后是MappendFile
MappendFile是RMQ的内存映射文件的具体实现
核心属性包括
public static final int OS_PAGE_SIZE = 1024 * 4;
操作系统的每页大小 4k
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
现在在JVM中存在的 MappendFile 虚拟内存大小
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
JVM中MappendFile的对象个数
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
写指针,从0开始
protected final AtomicInteger committedPosition = new AtomicInteger(0);
提交指针,如果开启了transientStorePoolEnable,那么会将数据存储在TransientStorePool中,之后提交到内存映射ByteBuffer中,最后写到磁盘
private final AtomicInteger flushedPosition = new AtomicInteger(0);
刷盘指针,这个指针说明已经到哪里了
protected int fileSize;
文件大小
protected FileChannel fileChannel;
文件传输通道,看起来是正在进行刷盘用的
protected ByteBuffer writeBuffer = null;
堆外内存ByteBuffer,如果不为空,那么会将数据先存储在这里面,然后提交给MappedFile对应的Buffer
protected TransientStorePool transientStorePool = null;
堆外内存池,该内存池中内存会提供锁定内存机制
private String fileName;
文件名
private long fileFromOffset;
初始偏移量
private File file;
物理文件
private MappedByteBuffer mappedByteBuffer;
物理内存对应内存映射Buffer
private volatile long storeTimestamp = 0;
最后一次的存储的时间
private boolean firstCreateInQueue = false;
是不是MappedFileQueue中第一个文件
说完了对应的属性,我们先从MappedFile的init开始说起
在init中,由于存在是否开启transientStorePoolEnable,所以初始化存在两种情况
如果enable为true,则表示先存储在堆外内存,然后通过commit线程将数据提交到内存映射Buffer中,最后通过Flush线程将Buffer数据持久化到磁盘中
我们看下对应的init函数
//设置文件名称,即为初始的偏移量
this.fileName = fileName; //大小 this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; //上述创建文件 ensureDirOK(this.file.getParent()); try { this.fileChannel = new RandomAccessFile(this.file, “rw”).getChannel(); //根据RandomAccessFile获取到文件通道 this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); //将buffer设置为ByteBuffer中 |
如果是enable为true
那么init中还会
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool; |
设置对应的writeBuffer和对应的transientStorePool
2.mappedFile的提交 commit
内存映射文件的提交动作交给了MappendFile的commit方法实现
在MappedFile的commit函数中
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition. return this.wrotePosition.get(); } |
如果writeBuffer为空,说明没开启堆外内存,说明应该直接给文件映射的Buffer
//chech is allow to commit
if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } else { log.warn(“in commit, hold failed, commit offset = ” + this.committedPosition.get()); } } |
然后进行检测,是否超过了提交的阈值,如果操作了,进行提交操作
检测的实际函数如下
protected boolean isAbleToCommit(final int commitLeastPages) {
//分别获取到可以刷新和写的指针 int flush = this.committedPosition.get(); int write = this.wrotePosition.get(); if (this.isFull()) { return true; } //判断是否达到了最小写的页数 if (commitLeastPages > 0) { return ((write / OS_PAGE_SIZE) – (flush / OS_PAGE_SIZE)) >= commitLeastPages; } return write > flush; } |
实际的提交操作
创建一个writeBuffer的共享缓冲区,然后写入后,进行对应的limit
protected void commit0(final int commitLeastPages) {
//获取到写指针 int writePos = this.wrotePosition.get(); //上次提交指针 int lastCommittedPosition = this.committedPosition.get(); if (writePos – lastCommittedPosition > commitLeastPages) { try { //创建新的共享缓存区 ByteBuffer byteBuffer = writeBuffer.slice(); //讲共享缓存区回退到上次提交位置 byteBuffer.position(lastCommittedPosition); //讲limit设置为wrotePosition byteBuffer.limit(writePos); //讲数据写入到fileChannel中 this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); //更改写指针 this.committedPosition.set(writePos); } catch (Throwable e) { log.error(“Error occurred when commit data to FileChannel.”, e); } } } |
最后更新对应的指针,完成commit操作
接下来说的就是MappedFile的刷盘操作了
将内存的数据直接写入磁盘,写盘的api函数在 flush()函数中
只要符合了条件,那么久判断是mappedByteBuffer还是writeBuffer进行写入内存
无论哪种,写完之后都应该更新指针了
flushPosition应该等于MappendByteBuffer中的写指针了
修改指针为读指针
那么接下来,我们就需要说一下获取MappedFile中最大读指针的接口
getReadPosition
RMQ的一个组织方式就是内存映射文件,预先申请一块内存,这就需要有人可以标识当前最大有效数据的位置,获取的方式就是利用getReadPosition来实现
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); } |
根据writeBuffer是否为空,来返回写指针还是提交的指针
writeBuffer为空,就说明是直接走的FileChannel,就只能利用提交的指针来确定安全
最后是关于MappedFile的销毁相关api
mappendFile的 destory/api
其入参为
public boolean destroy(/*存活的最大时间*/ final long intervalForcibly) {
在其中最终的调用就是走了本类中的shutdown
public void shutdown(final long intervalForcibly) {
if (this.available) { this.available = false; this.firstShutdownTimestamp = System.currentTimeMillis(); this.release(); } else if (this.getRefCount() > 0) { if ((System.currentTimeMillis() – this.firstShutdownTimestamp) >= intervalForcibly) { this.refCount.set(-1000 – this.getRefCount()); this.release(); } } } |
shutdown函数中
其中会尝试关闭MappedFile
将available设置为了false
调用release释放资源,在release中,只有引用小于0 的时候,才会释放
这样,如果引用数量大于0,对比当前时间和firstshutdownTimeStamp
如果大于了最大时间,就减少1000个引用数
然后判断是否清空引用,这个在对应的release中有设置
public boolean destroy(/*存活的最大时间*/ final long intervalForcibly) {
this.shutdown(intervalForcibly); if (this.isCleanupOver()) { try { this.fileChannel.close(); log.info(“close file channel ” + this.fileName + ” OK”); long beginTime = System.currentTimeMillis(); boolean result = this.file.delete(); log.info(“delete file[REF:” + this.getRefCount() + “] ” + this.fileName + (result ? ” OK, ” : ” Failed, “) + “W:” + this.getWrotePosition() + ” M:” + this.getFlushedPosition() + “, ” + UtilAll.computeElapsedTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn(“close file channel ” + this.fileName + ” Failed. “, e); } return true; } else { log.warn(“destroy mapped file[REF:” + this.getRefCount() + “] ” + this.fileName + ” Failed. cleanupOver: ” + this.cleanupOver); } return false; } |
然后关闭物理通道,删除物理文件
this.fileChannel.close();
log.info(“close file channel ” + this.fileName + ” OK”);
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
这样整体的MappedFile的Api就基本走完了
最后是TransientStorePool
一个小型的存储池,RMQ单独的创建一个MappedByteBuffer内缓存池,然后临时存储数据,数据先写到内存映射中,然后由commit线程定时的将数据从内存中复制到目的物理文件对应的内存映射中,然后定期的将数据从内存复制到物理文件的内存映射
这个机制的主要作用是提供一部分的内存锁定,将堆外的内存锁定在内存中,避免进行内存的交换
TransientStorePool的属性如下
private final int poolSize;
private final int fileSize;
fileSize,每个ByteBuffer的大小,默认为mappedFileSizeCommitLog
poolSize avaliableBuffers个数,可通过broker中配置文件设置transientStorePoolSize,默认为5个
Deque<ByteBuffer> availableBuffers,ByteBuffer容器,双端队列