RMQ的存储是基于了JDK的NIO的内存映射机制,消息存储的时候先将消息追加到内存中,然后根据配置的刷盘策略在不同的时候进行写磁盘

如果是同步刷盘,直接commit之后,利用MappedByteBuffer的force进行输盘,如果是异步刷盘,则是先追加到内存中之后立刻返回,由一个单独的线程在进行磁盘操作

在broker的配置文件中,支持配置刷盘的方式,分别是ASYNC_FLUSH 异步刷盘

SYNC_FLUSH 同步刷盘,默认是异步刷盘

我们来说Commitlog的刷盘机制

首先的刷盘实现是commitLog的handleDiskFlush()方法

在putMessage中触发了这个调用关系

首先是有putMessage作为入参

然后再handleDiskFlush中,进行判断是同步还是异步

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {

在其中,进行获取到提交的service

创建出一个GroupCommitRequest,用于提交的数据结构,在这个数据结构中的属性主要有

private final long nextOffset;

偏移量

private CompletableFuture<PutMessageStatus> flushOKFuture = new

CompletableFuture<>();

Completable用于异步或者同步通信

private final long startTimestamp = System.currentTimeMillis();

开始时间

private long timeoutMillis = Long.MAX_VALUE;

最大超时时间

对应的service api为

private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();

private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

为什么分了两个队列呢?

这是一种天然的避免读写冲突的方式

在代码中,我们将上面封装好的Request放入了这个service的Write队列中,放入的方法为

public synchronized void putRequest(final GroupCommitRequest request) {

synchronized (this.requestsWrite) {

this.requestsWrite.add(request);

}

this.wakeup();

}

同步的形式提交任务到GroupcommitService

然后在其中调用了wakeup唤醒这个线程

public void wakeup() {

if (hasNotified.compareAndSet(false, true)) {

waitPoint.countDown(); // notify

}

}

将waitPoint设置为0,进行唤醒

在唤醒之后 Service的父类ServiceThread会调用抽象方法 onWaitEnd,交由实际的实现者

GroupCommitService进行执行,其中进行交换读和写的队列

private void swapRequests() {

//这一步交换读写容器,感觉应该是避免读写的不一致,方便提交的时候继续放入提交请求

List<GroupCommitRequest> tmp = this.requestsWrite;

this.requestsWrite = this.requestsRead;

this.requestsRead = tmp;

}

然后在run函数中,一旦完成了唤醒,就进行提交了

在doCommit函数中,是实际的执行代码

private void doCommit() {

synchronized (this.requestsRead) {

//交换后的requestRead,避免了并发问题

if (!this.requestsRead.isEmpty()) {

for (GroupCommitRequest req : this.requestsRead) {

// There may be a message in the next file, so a maximum of

// two times the flush

//查看是不是已经刷盘了

boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

for (int i = 0; i < 2 && !flushOK; i++) {

//实际执行刷盘

CommitLog.this.mappedFileQueue.flush(0);

flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

}

//进行推送

req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);

}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();

if (storeTimestamp > 0) {

//刷盘完成之后,更新刷盘点StoreCheckpoint中的physicMsgTimeStamp

CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

}

this.requestsRead.clear();

} else {

// Because of individual messages is set to not sync flush, it

// will come to this process

CommitLog.this.mappedFileQueue.flush(0);

}

}

}

上述的就是整个同步刷盘的流程和机制

接下来是Broker进行异步的输盘机制

在异步的刷盘中,分为了是否开启堆外内存的方式

如果开启了堆外内存,那么刷盘的流程为

现将消息追加到ByteBuffer中

然后CommitRealTimeService线程默认每200ms就讲ByteBuffer中的内容提交到FileChannel

FlushReadTimeService每500秒进行一次写入磁盘,实际代码如下

flushCommitLogService.wakeup();

在内部唤醒了run函数

@Override

public void run() {

CommitLog.log.info(this.getServiceName() + ” service started”);

while (!this.isStopped()) {

//等待时间,也就是唤醒时间

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

//一次任务最少提交的页数

int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

//两次提交的最大间隔

int commitDataThoroughInterval =

CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

//忽略页数的提交

long begin = System.currentTimeMillis();

if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {

this.lastCommitTimestamp = begin;

commitDataLeastPages = 0;

}

try {

//执行提交操作.也就是将提交数据提交到物理文件的内存映射内存

boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);

long end = System.currentTimeMillis();

if (!result) {

//如果为false,就延长输盘时间

this.lastCommitTimestamp = end; // result = false means some data committed.

//now wake up flush thread.

flushCommitLogService.wakeup();

}

if (end – begin > 500) {

log.info(“Commit data to file costs {} ms”, end – begin);

}

this.waitForRunning(interval);

} catch (Throwable e) {

CommitLog.log.error(this.getServiceName() + ” service has exception. “, e);

}

}

接下来是刷盘的机制

public void run() {

CommitLog.log.info(this.getServiceName() + ” service started”);

while (!this.isStopped()) {

//是否使用Thread.sleep方法,如果为false 表示为await方法进行等待

boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

//刷盘间隔

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();

//一次刷盘的包含页数

int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

//真实刷盘的最大间隔

int flushPhysicQueueThoroughInterval =

CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

boolean printFlushProgress = false;

// Print flush progress

// 达到最小忽略提交时间

long currentTimeMillis = System.currentTimeMillis();

if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {

this.lastFlushTimestamp = currentTimeMillis;

flushPhysicQueueLeastPages = 0;

printFlushProgress = (printTimes++ % 10) == 0;

}

try {

if (flushCommitLogTimed) {

Thread.sleep(interval);

} else {

this.waitForRunning(interval);

}

//先等待一定时间,再执行输盘任务

if (printFlushProgress) {

this.printFlushProgress();

}

long begin = System.currentTimeMillis();

//进行实际的刷新

CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();

if (storeTimestamp > 0) {

//更新更新时间戳

CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

}

long past = System.currentTimeMillis() – begin;

if (past > 500) {

log.info(“Flush data to disk costs {} ms”, past);

}

} catch (Throwable e) {

CommitLog.log.warn(this.getServiceName() + ” service has exception. “, e);

this.printFlushProgress();

}

进行相关的刷盘操作

这样就完成了整体的文件写入

发表评论

邮箱地址不会被公开。 必填项已用*标注