RMQ将所有的消息存储在CommitLog中,然后异步的一毫秒一次的进行同步到ConsumeQueue和Index文件,这就可能导致一个问题,就是如果异常宕机了的话,会导致CommitLog ConsumeQueue IndexFile三者不一致的情况,如果不加以人工修复,就会有一部分的消息永远在commitLog中存在,但是由于ConsumeQueue中没有,那么就不会被消费到
这一致性的修复工作,可以从DefaultMessageStore的load开始
在这一步骤中,需要判断上一次的推出是否是正常推出
boolean lastExitOK = !this.isTempFileExist(); |
关于这个检测文件的api,实现的机制就是在Broker启动的时候创建 ${ROCKET_HOME}/store/abort文件,推出的时候注册JVM钩子函数进行删除abort文件,然后这里面检测这个abort文件,判断是否是异常推出的
如果正常的退出的话,会调用shutdown
if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); shutDownNormal = true; |
加载延迟队列
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load(); } |
然后初始化commitLog
commitLog的load中进行mappedQueue的load中
在对应的MappedQueue的load中
进行相关的加载工作
public boolean load() {
File dir = new File(this.storePath); File[] files = dir.listFiles(); if (files != null) { // ascending order //进行排序 Arrays.sort(files); for (File file : files) { //如果文件大小和配置的单个大小不一致,就进行推出 if (file.length() != this.mappedFileSize) { log.warn(file + “\t” + file.length() + ” length not matched message store config value, please check it manually”); return false; } try { //开始常见内存中的mappedFile MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); //都设置为文件上限 mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); log.info(“load ” + file.getPath() + ” OK”); } catch (IOException e) { log.error(“load file ” + file + ” error”, e); return false; } } } return true; } |
首先是加载对应的存储路径
获取这个目录下的所有文件
然后根据文件名称进行排序,从小到大
之后初始化MappedFile,设置文件上限,设置storePath mappedFilSize等
之后是加载ConsumeQueue
调用了loadConsumeQueue
private boolean loadConsumeQueue() {
//获取到目录 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); //目录下所有的file File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { //获取到该Borker存储的所有主题,进行遍历主题目录 for (File fileTopic : fileTopicList) { //得到了主题 String topic = fileTopic.getName(); //之后得到这个主题下的文件,进行遍历 File[] fileQueueIdList = fileTopic.listFiles(); if (fileQueueIdList != null) { for (File fileQueueId : fileQueueIdList) { int queueId; try { //尝试获取到消费队列Id queueId = Integer.parseInt(fileQueueId.getName()); } catch (NumberFormatException e) { continue; } //常见消费队列内存数据结构 ConsumeQueue logic = new ConsumeQueue( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this); //主要初始化的属性有 topic queueId storePath mappedFileSize属性 this.putConsumeQueue(topic, queueId, logic); if (!logic.load()) { return false; } } } } } |
加载到存储检测点
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); |
因为Checkpoint存储了commitLog ConsumeQueue Index的刷盘点,故根据其进行恢复
如果是存在异常退出,则进行index恢复工作
this.indexService进行load操作
同样是加载indexService到内存中,不过如果lastExitOK是flase说明是异常退出的
如果索引文件的刷盘时间小于该索引文件的最大消息时间戳则进行文件的销毁
然后进行对应的恢复工作
对应的恢复分为了正常恢复和异常恢复
我们分别介绍不同的恢复逻辑
对于正常及不正常的恢复我们先按下不表
说一下恢复ConsumeQueue之后
将在ComsumeQueue恢复之后设置commitLog中存储主题,消息队列ID 还存储了消息队列偏移量
public void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024); long minPhyOffset = this.commitLog.getMinOffset(); for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + “-” + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); } } this.commitLog.setTopicQueueTable(table); } |
然后进行异常文件恢复
对于异常文件的恢复,实现是CommitLog#recoverAbnormally,异常恢复的步骤中,主要涉及到先找到第一个存储正常的文件
其次,如果commitlog目录没有文件,则需要销毁消息消费文件
那么如何判断一个消息是正确的文件呢?
//获取到所有的MappedFiles
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // Looking beginning to recover from which file int index = mappedFiles.size() – 1; MappedFile mappedFile = null; for (; index >= 0; index–) { mappedFile = mappedFiles.get(index); if (this.isMappedFileMatchedRecover(mappedFile)) { log.info(“recover from this mapped file ” + mappedFile.getFileName()); break; } } |
首先获取所有的文件,然后从后往前,判断哪个可以进行恢复的基准
具体的判断api如下
//此方法成功了,说明从这个开始恢复
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) { //是否需要恢复 ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); //首先是检查魔数 int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION); if (magicCode != MESSAGE_MAGIC_CODE) { return false; } int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION); int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20; int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength; //获取到初始化时间 long storeTimestamp = byteBuffer.getLong(msgStoreTimePos); if (0 == storeTimestamp) { //如果为0,说明文件不可用 return false; } //判断时间戳,如果时间戳是小于文件检查点的时间戳,那么就说明是可靠的 if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { log.info(“find check timestamp, {} {}”, storeTimestamp, UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } } else { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { log.info(“find check timestamp, {} {}”, storeTimestamp, UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } } return false; } |
如果找到了,就进行恢复操作
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { // Normal data if (size > 0) { mappedFileOffset += size; if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { this.defaultMessageStore.doDispatch(dispatchRequest); } } else { this.defaultMessageStore.doDispatch(dispatchRequest); } } |
如果没有找到,则进行销毁
else {
log.warn(“The commitlog files are deleted, and delete the consume queue files”); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); } |
从上述的代码可以看出
RMQ保证消息的不丢失,但是不保证不会重复消费
故消费者需要实现消息消费的幂等性