RMQ的文件机制交由CommitLog和ConsumeQueue实现的,在启动的时候,会加载commitlog和consumeQueue下的所有文件,为了避免浪费内存,不会将一个文件永久存储在服务器上,如果一个文件在一定时间内没有再次被更新,则认为是过期文件,可以被删除,RMQ不会关注这个文件上消息是否能够被全部消费,认为如果文件超过了72小时,就进行删除,这个时间可以通过

fileReseverdTime进行配置

我们就来看下这个过期文件删除的机制如何实现的

入口在DefaultMessageStore中的addScheduleTask实现

在整个函数中,进行一个定时线程的触发

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

DefaultMessageStore.this.cleanFilesPeriodically();

}

}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

其中的cleanFilesPeriodically就是实际的执行者,代码中声明了,每隔10s中进行一次调用,

在其中,分别调用了

private void cleanFilesPeriodically() {

this.cleanCommitLogService.run();

this.cleanConsumeQueueService.run();

}

分别是请求消息存储文件 commitLog

消息队列消费文件 ConsumeQueue

这一次,我们以CommitLog为主线进行讲解

主要的实现方式

CleanCommitLogService#run

在run函数中首先是deleteExpiredFiles,其中

首先是获取一些对应的配置属性

//刪除數量

int deleteCount = 0;

//文件保留时间.是根据文件更新时间戳进行判断过期的依据

long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();

//删除文件的间隔,多删的时候,指定删除的每次间隔

int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();

//如果有文件拒绝被删除,则记录这个时间,在下面的规定时间间隔到了之后,强制删除

int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

然后判断,是否删除的条件

需要判断三个,三个只要满足一个就可以进行删除

//三种情况满足其一,进行删除

//指定删除文件的时间点,deleteWhen设置一个固定删除的时间点,默认是凌晨4点

boolean timeup = this.isTimeToDelete();

//磁盘空间是否充足

boolean spacefull = this. isSpaceToDelete();

//手动触发,暂无提供响应的指令

boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

其中,计算磁盘空间是否充足的函数详情如下

//检测磁盘是否充足

private boolean isSpaceToDelete() {

//DiskMaxUsedSpaceRatio 最大使用率

double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;

//是否立刻执行清除过期文件操作

cleanImmediately = false;

{

//获取到物理使用率

double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());

if (physicRatio > diskSpaceWarningLevelRatio) {

boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();

if (diskok) {

DefaultMessageStore.log.error(“physic disk maybe full soon ” + physicRatio + “, so mark disk full”);

}

cleanImmediately = true;

} else if (physicRatio > diskSpaceCleanForciblyRatio) {

cleanImmediately = true;

} else {

boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();

if (!diskok) {

DefaultMessageStore.log.info(“physic disk space OK ” + physicRatio + “, so mark disk ok”);

}

}

if (physicRatio < 0 || physicRatio > ratio) {

DefaultMessageStore.log.info(“physic disk maybe full soon, so reclaim space, ” + physicRatio);

return true;

}

}

获取到物理磁盘的使用率进行对比,获得是否可以删除的结果

接下来走到进行删除的函数中

deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,

destroyMapedFileIntervalForcibly, cleanAtOnce);

一路走到对应的

deleteExpiredFileByTime函数中

public int deleteExpiredFileByTime(final long expiredTime,

final int deleteFilesInterval,

final long intervalForcibly,

final boolean cleanImmediately) {

Object[] mfs = this.copyMappedFiles(0);

//拿到所有的file

if (null == mfs)

return 0;

int mfsLength = mfs.length – 1;

int deleteCount = 0;

List<MappedFile> files = new ArrayList<MappedFile>();

if (null != mfs) {

//进行遍历

for (int i = 0; i < mfsLength; i++) {

MappedFile mappedFile = (MappedFile) mfs[i];

//获取到文件最后一次更新时间,然后加上可以存活时间

long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;

//如果当前时间大于了上面计算结果

if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {

//进行删除

if (mappedFile.destroy(intervalForcibly)) {

files.add(mappedFile);

deleteCount++;

if (files.size() >= DELETE_FILES_BATCH_MAX) {

break;

}

if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {

try {

Thread.sleep(deleteFilesInterval);

} catch (InterruptedException e) {

}

}

} else {

break;

}

} else {

//avoid deleting files in the middle

break;

}

}

}

执行相关的删除操作

发表评论

邮箱地址不会被公开。 必填项已用*标注