RMQ的存储是基于了JDK的NIO的内存映射机制,消息存储的时候先将消息追加到内存中,然后根据配置的刷盘策略在不同的时候进行写磁盘
如果是同步刷盘,直接commit之后,利用MappedByteBuffer的force进行输盘,如果是异步刷盘,则是先追加到内存中之后立刻返回,由一个单独的线程在进行磁盘操作
在broker的配置文件中,支持配置刷盘的方式,分别是ASYNC_FLUSH 异步刷盘
SYNC_FLUSH 同步刷盘,默认是异步刷盘
我们来说Commitlog的刷盘机制
首先的刷盘实现是commitLog的handleDiskFlush()方法
在putMessage中触发了这个调用关系
首先是有putMessage作为入参
然后再handleDiskFlush中,进行判断是同步还是异步
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { |
在其中,进行获取到提交的service
创建出一个GroupCommitRequest,用于提交的数据结构,在这个数据结构中的属性主要有
private final long nextOffset;
偏移量
private CompletableFuture<PutMessageStatus> flushOKFuture = new
CompletableFuture<>();
Completable用于异步或者同步通信
private final long startTimestamp = System.currentTimeMillis();
开始时间
private long timeoutMillis = Long.MAX_VALUE;
最大超时时间
对应的service api为
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); |
为什么分了两个队列呢?
这是一种天然的避免读写冲突的方式
在代码中,我们将上面封装好的Request放入了这个service的Write队列中,放入的方法为
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) { this.requestsWrite.add(request); } this.wakeup(); } |
同步的形式提交任务到GroupcommitService
然后在其中调用了wakeup唤醒这个线程
public void wakeup() {
if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } } |
将waitPoint设置为0,进行唤醒
在唤醒之后 Service的父类ServiceThread会调用抽象方法 onWaitEnd,交由实际的实现者
GroupCommitService进行执行,其中进行交换读和写的队列
private void swapRequests() {
//这一步交换读写容器,感觉应该是避免读写的不一致,方便提交的时候继续放入提交请求 List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } |
然后在run函数中,一旦完成了唤醒,就进行提交了
在doCommit函数中,是实际的执行代码
private void doCommit() {
synchronized (this.requestsRead) { //交换后的requestRead,避免了并发问题 if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush //查看是不是已经刷盘了 boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); for (int i = 0; i < 2 && !flushOK; i++) { //实际执行刷盘 CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } //进行推送 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { //刷盘完成之后,更新刷盘点StoreCheckpoint中的physicMsgTimeStamp CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } |
上述的就是整个同步刷盘的流程和机制
接下来是Broker进行异步的输盘机制
在异步的刷盘中,分为了是否开启堆外内存的方式
如果开启了堆外内存,那么刷盘的流程为
现将消息追加到ByteBuffer中
然后CommitRealTimeService线程默认每200ms就讲ByteBuffer中的内容提交到FileChannel
FlushReadTimeService每500秒进行一次写入磁盘,实际代码如下
flushCommitLogService.wakeup();
在内部唤醒了run函数
@Override
public void run() { CommitLog.log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { //等待时间,也就是唤醒时间 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); //一次任务最少提交的页数 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); //两次提交的最大间隔 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); //忽略页数的提交 long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { //执行提交操作.也就是将提交数据提交到物理文件的内存映射内存 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { //如果为false,就延长输盘时间 this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end – begin > 500) { log.info(“Commit data to file costs {} ms”, end – begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + ” service has exception. “, e); } } |
接下来是刷盘的机制
public void run() {
CommitLog.log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { //是否使用Thread.sleep方法,如果为false 表示为await方法进行等待 boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); //刷盘间隔 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); //一次刷盘的包含页数 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); //真实刷盘的最大间隔 int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress // 达到最小忽略提交时间 long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } //先等待一定时间,再执行输盘任务 if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); //进行实际的刷新 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { //更新更新时间戳 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() – begin; if (past > 500) { log.info(“Flush data to disk costs {} ms”, past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + ” service has exception. “, e); this.printFlushProgress(); } |
进行相关的刷盘操作
这样就完成了整体的文件写入