RMQ通过内存映射文件提高IO性能,是零拷贝技术吗?使其无论是CommitLog ConsumeQueue IndexFile,都是来做到固定长度的,并且利用固定长度,保证能找到对应的文件,而且固定长度保证了不会爆栈,文件名就是该文件第一条消息对应的全局物理偏移量

CommitLog文件组织方式如下所示

000000000000000000000000

000000000000001073741824

在RMQ中,被实际封装为了MappendFile MappendFileQueu对象

MappendFile实际对应

而在实际中,MappendFileQueue是MappendFile的管理容器,内部属性如下

String storePath;

存储目录

int mappedFileSize;

文件大小

CopyOnWriteArrayList<MappedFile> mappedFiles

文件集合

AllocateMappedFileService allocateMappedFileService

创建MappedFile的Service

long flushedWhere

表明指针之前所有数据全部持久化

long committedWhere = 0;

大于等于上面的持久化指针,表明提交指针

接下来,我们说一下,根据不同维度来进行查找MappendFile,也就是commitLog

首先是根据时间戳来进行查找MappendFile

public MappedFile getMappedFileByTime(final long timestamp) {

Object[] mfs = this.copyMappedFiles(0);

if (null == mfs)

return null;

for (int i = 0; i < mfs.length; i++) {

MappedFile mappedFile = (MappedFile) mfs[i];

if (mappedFile.getLastModifiedTimestamp() >= timestamp) {

return mappedFile;

}

}

return (MappedFile) mfs[mfs.length – 1];

}

上面的代码中,我们会从列表中的第一个文件开始查找.找到一个最后一次更新时间大于待查找时间戳的文件,不存在,就返回最后一个文件

然后是根据消息偏移量来查找MappendFile

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {

try {

MappedFile firstMappedFile = this.getFirstMappedFile();

MappedFile lastMappedFile = this.getLastMappedFile();

if (firstMappedFile != null && lastMappedFile != null) {

if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {

LOG_ERROR.warn(“Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}”,

offset,

firstMappedFile.getFileFromOffset(),

lastMappedFile.getFileFromOffset() + this.mappedFileSize,

this.mappedFileSize,

this.mappedFiles.size());

} else {

int index = (int) ((offset / this.mappedFileSize) – (firstMappedFile.getFileFromOffset() / this.mappedFileSize));

MappedFile targetFile = null;

try {

targetFile = this.mappedFiles.get(index);

} catch (Exception ignored) {

}

if (targetFile != null && offset >= targetFile.getFileFromOffset()

&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {

return targetFile;

}

for (MappedFile tmpMappedFile : this.mappedFiles) {

if (offset >= tmpMappedFile.getFileFromOffset()

&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {

return tmpMappedFile;

}

}

}

if (returnFirstOnNotFound) {

return firstMappedFile;

}

}

} catch (Exception e) {

log.error(“findMappedFileByOffset Exception”, e);

}

return null;

}

由于不可能将所有的文件都存于内存中,所以RMQ支持了定时删除存储文件的策略

根据offset定位MappendFile的算法为 (int)((offset / this.mappendFileSize – mappendFile.getFileFromOffset() / this.MappendFileSize))

获取当前的最小偏移量

public long getMinOffset() {

if (!this.mappedFiles.isEmpty()) {

try {

return this.mappedFiles.get(0).getFileFromOffset();

} catch (IndexOutOfBoundsException e) {

//continue;

} catch (Exception e) {

log.error(“getMinOffset has exception.”, e);

}

}

return -1;

}

返回当前第一个MappendFile的偏移量

获取存储文件的最大偏移量

public long getMaxOffset() {

MappedFile mappedFile = getLastMappedFile();

if (mappedFile != null) {

return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();

}

return 0;

}

返回最后一个文件的偏移量加上文件的读指针

获取了当前的写指针,返回最后一个文件的fileFromOffset加上当前文件的写指针

public long getMaxWrotePosition() {

MappedFile mappedFile = getLastMappedFile();

if (mappedFile != null) {

return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();

}

return 0;

}

然后是MappendFile

MappendFile是RMQ的内存映射文件的具体实现

核心属性包括

public static final int OS_PAGE_SIZE = 1024 * 4;

操作系统的每页大小 4k

private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

现在在JVM中存在的 MappendFile 虚拟内存大小

private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);

JVM中MappendFile的对象个数

protected final AtomicInteger wrotePosition = new AtomicInteger(0);

写指针,从0开始

protected final AtomicInteger committedPosition = new AtomicInteger(0);

提交指针,如果开启了transientStorePoolEnable,那么会将数据存储在TransientStorePool中,之后提交到内存映射ByteBuffer中,最后写到磁盘

private final AtomicInteger flushedPosition = new AtomicInteger(0);

刷盘指针,这个指针说明已经到哪里了

protected int fileSize;

文件大小

protected FileChannel fileChannel;

文件传输通道,看起来是正在进行刷盘用的

protected ByteBuffer writeBuffer = null;

堆外内存ByteBuffer,如果不为空,那么会将数据先存储在这里面,然后提交给MappedFile对应的Buffer

protected TransientStorePool transientStorePool = null;

堆外内存池,该内存池中内存会提供锁定内存机制

private String fileName;

文件名

private long fileFromOffset;

初始偏移量

private File file;

物理文件

private MappedByteBuffer mappedByteBuffer;

物理内存对应内存映射Buffer

private volatile long storeTimestamp = 0;

最后一次的存储的时间

private boolean firstCreateInQueue = false;

是不是MappedFileQueue中第一个文件

说完了对应的属性,我们先从MappedFile的init开始说起

在init中,由于存在是否开启transientStorePoolEnable,所以初始化存在两种情况

如果enable为true,则表示先存储在堆外内存,然后通过commit线程将数据提交到内存映射Buffer中,最后通过Flush线程将Buffer数据持久化到磁盘中

我们看下对应的init函数

//设置文件名称,即为初始的偏移量

this.fileName = fileName;

//大小

this.fileSize = fileSize;

this.file = new File(fileName);

this.fileFromOffset = Long.parseLong(this.file.getName());

boolean ok = false;

//上述创建文件

ensureDirOK(this.file.getParent());

try {

this.fileChannel = new RandomAccessFile(this.file, “rw”).getChannel();

//根据RandomAccessFile获取到文件通道

this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

//将buffer设置为ByteBuffer中

如果是enable为true

那么init中还会

this.writeBuffer = transientStorePool.borrowBuffer();

this.transientStorePool = transientStorePool;

设置对应的writeBuffer和对应的transientStorePool

2.mappedFile的提交 commit

内存映射文件的提交动作交给了MappendFile的commit方法实现

在MappedFile的commit函数中

if (writeBuffer == null) {

//no need to commit data to file channel, so just regard wrotePosition as committedPosition.

return this.wrotePosition.get();

}

如果writeBuffer为空,说明没开启堆外内存,说明应该直接给文件映射的Buffer

//chech is allow to commit

if (this.isAbleToCommit(commitLeastPages)) {

if (this.hold()) {

commit0(commitLeastPages);

this.release();

} else {

log.warn(“in commit, hold failed, commit offset = ” + this.committedPosition.get());

}

}

然后进行检测,是否超过了提交的阈值,如果操作了,进行提交操作

检测的实际函数如下

protected boolean isAbleToCommit(final int commitLeastPages) {

//分别获取到可以刷新和写的指针

int flush = this.committedPosition.get();

int write = this.wrotePosition.get();

if (this.isFull()) {

return true;

}

//判断是否达到了最小写的页数

if (commitLeastPages > 0) {

return ((write / OS_PAGE_SIZE) – (flush / OS_PAGE_SIZE)) >= commitLeastPages;

}

return write > flush;

}

实际的提交操作

创建一个writeBuffer的共享缓冲区,然后写入后,进行对应的limit

protected void commit0(final int commitLeastPages) {

//获取到写指针

int writePos = this.wrotePosition.get();

//上次提交指针

int lastCommittedPosition = this.committedPosition.get();

if (writePos – lastCommittedPosition > commitLeastPages) {

try {

//创建新的共享缓存区

ByteBuffer byteBuffer = writeBuffer.slice();

//讲共享缓存区回退到上次提交位置

byteBuffer.position(lastCommittedPosition);

//讲limit设置为wrotePosition

byteBuffer.limit(writePos);

//讲数据写入到fileChannel中

this.fileChannel.position(lastCommittedPosition);

this.fileChannel.write(byteBuffer);

//更改写指针

this.committedPosition.set(writePos);

} catch (Throwable e) {

log.error(“Error occurred when commit data to FileChannel.”, e);

}

}

}

最后更新对应的指针,完成commit操作

接下来说的就是MappedFile的刷盘操作了

将内存的数据直接写入磁盘,写盘的api函数在 flush()函数中

只要符合了条件,那么久判断是mappedByteBuffer还是writeBuffer进行写入内存

无论哪种,写完之后都应该更新指针了

flushPosition应该等于MappendByteBuffer中的写指针了

修改指针为读指针

那么接下来,我们就需要说一下获取MappedFile中最大读指针的接口

getReadPosition

RMQ的一个组织方式就是内存映射文件,预先申请一块内存,这就需要有人可以标识当前最大有效数据的位置,获取的方式就是利用getReadPosition来实现

public int getReadPosition() {

return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();

}

根据writeBuffer是否为空,来返回写指针还是提交的指针

writeBuffer为空,就说明是直接走的FileChannel,就只能利用提交的指针来确定安全

最后是关于MappedFile的销毁相关api

mappendFile的 destory/api

其入参为

public boolean destroy(/*存活的最大时间*/ final long intervalForcibly) {

在其中最终的调用就是走了本类中的shutdown

public void shutdown(final long intervalForcibly) {

if (this.available) {

this.available = false;

this.firstShutdownTimestamp = System.currentTimeMillis();

this.release();

} else if (this.getRefCount() > 0) {

if ((System.currentTimeMillis() – this.firstShutdownTimestamp) >= intervalForcibly) {

this.refCount.set(-1000 – this.getRefCount());

this.release();

}

}

}

shutdown函数中

其中会尝试关闭MappedFile

将available设置为了false

调用release释放资源,在release中,只有引用小于0 的时候,才会释放

这样,如果引用数量大于0,对比当前时间和firstshutdownTimeStamp

如果大于了最大时间,就减少1000个引用数

然后判断是否清空引用,这个在对应的release中有设置

public boolean destroy(/*存活的最大时间*/ final long intervalForcibly) {

this.shutdown(intervalForcibly);

if (this.isCleanupOver()) {

try {

this.fileChannel.close();

log.info(“close file channel ” + this.fileName + ” OK”);

long beginTime = System.currentTimeMillis();

boolean result = this.file.delete();

log.info(“delete file[REF:” + this.getRefCount() + “] ” + this.fileName

+ (result ? ” OK, ” : ” Failed, “) + “W:” + this.getWrotePosition() + ” M:”

+ this.getFlushedPosition() + “, ”

+ UtilAll.computeElapsedTimeMilliseconds(beginTime));

} catch (Exception e) {

log.warn(“close file channel ” + this.fileName + ” Failed. “, e);

}

return true;

} else {

log.warn(“destroy mapped file[REF:” + this.getRefCount() + “] ” + this.fileName

+ ” Failed. cleanupOver: ” + this.cleanupOver);

}

return false;

}

然后关闭物理通道,删除物理文件

this.fileChannel.close();

log.info(“close file channel ” + this.fileName + ” OK”);

long beginTime = System.currentTimeMillis();

boolean result = this.file.delete();

这样整体的MappedFile的Api就基本走完了

最后是TransientStorePool

一个小型的存储池,RMQ单独的创建一个MappedByteBuffer内缓存池,然后临时存储数据,数据先写到内存映射中,然后由commit线程定时的将数据从内存中复制到目的物理文件对应的内存映射中,然后定期的将数据从内存复制到物理文件的内存映射

这个机制的主要作用是提供一部分的内存锁定,将堆外的内存锁定在内存中,避免进行内存的交换

TransientStorePool的属性如下

private final int poolSize;

private final int fileSize;

fileSize,每个ByteBuffer的大小,默认为mappedFileSizeCommitLog

poolSize avaliableBuffers个数,可通过broker中配置文件设置transientStorePoolSize,默认为5个

Deque<ByteBuffer> availableBuffers,ByteBuffer容器,双端队列

发表评论

邮箱地址不会被公开。 必填项已用*标注