RMQ的文件机制交由CommitLog和ConsumeQueue实现的,在启动的时候,会加载commitlog和consumeQueue下的所有文件,为了避免浪费内存,不会将一个文件永久存储在服务器上,如果一个文件在一定时间内没有再次被更新,则认为是过期文件,可以被删除,RMQ不会关注这个文件上消息是否能够被全部消费,认为如果文件超过了72小时,就进行删除,这个时间可以通过
fileReseverdTime进行配置
我们就来看下这个过期文件删除的机制如何实现的
入口在DefaultMessageStore中的addScheduleTask实现
在整个函数中,进行一个定时线程的触发
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); |
其中的cleanFilesPeriodically就是实际的执行者,代码中声明了,每隔10s中进行一次调用,
在其中,分别调用了
private void cleanFilesPeriodically() {
this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); } |
分别是请求消息存储文件 commitLog
消息队列消费文件 ConsumeQueue
这一次,我们以CommitLog为主线进行讲解
主要的实现方式
CleanCommitLogService#run
在run函数中首先是deleteExpiredFiles,其中
首先是获取一些对应的配置属性
//刪除數量
int deleteCount = 0; //文件保留时间.是根据文件更新时间戳进行判断过期的依据 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); //删除文件的间隔,多删的时候,指定删除的每次间隔 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); //如果有文件拒绝被删除,则记录这个时间,在下面的规定时间间隔到了之后,强制删除 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); |
然后判断,是否删除的条件
需要判断三个,三个只要满足一个就可以进行删除
//三种情况满足其一,进行删除
//指定删除文件的时间点,deleteWhen设置一个固定删除的时间点,默认是凌晨4点 boolean timeup = this.isTimeToDelete(); //磁盘空间是否充足 boolean spacefull = this. isSpaceToDelete(); //手动触发,暂无提供响应的指令 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; |
其中,计算磁盘空间是否充足的函数详情如下
//检测磁盘是否充足
private boolean isSpaceToDelete() { //DiskMaxUsedSpaceRatio 最大使用率 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; //是否立刻执行清除过期文件操作 cleanImmediately = false; { //获取到物理使用率 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); if (physicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error(“physic disk maybe full soon ” + physicRatio + “, so mark disk full”); } cleanImmediately = true; } else if (physicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info(“physic disk space OK ” + physicRatio + “, so mark disk ok”); } } if (physicRatio < 0 || physicRatio > ratio) { DefaultMessageStore.log.info(“physic disk maybe full soon, so reclaim space, ” + physicRatio); return true; } } |
获取到物理磁盘的使用率进行对比,获得是否可以删除的结果
接下来走到进行删除的函数中
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce); |
一路走到对应的
deleteExpiredFileByTime函数中
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { Object[] mfs = this.copyMappedFiles(0); //拿到所有的file if (null == mfs) return 0; int mfsLength = mfs.length – 1; int deleteCount = 0; List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { //进行遍历 for (int i = 0; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; //获取到文件最后一次更新时间,然后加上可以存活时间 long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; //如果当前时间大于了上面计算结果 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { //进行删除 if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; } } } |
执行相关的删除操作