这次我们先整体梳理下消息存储流程
消息存储实现类是 org.apache.rocketmq.store.DefaultMessageStore
是核心类库,存储了大量的对其他文件操作的api
核心属性包括:
1.MessageStoreConfig messageStoreConfig 消息存储配置属性
2.CommitLog commitLog文件的实现类
3.ConcurrentMap<String,ConcurrentMap<Integer,ConsumeQueue>> consumeQueueTable 消息队列存储缓存表,按照消息主题分组
4.FlushConsumeQueueService 刷盘线程
5.CleanCommitLogService 清除CommitLog文件服务
6.CleanCommitQueueService 清除ConsumeQueue文件服务
7.IndexService 索引文件实现类
8.AllocateMappendFileService MappendFile分配服务
9.ReputMessageService 消息分发,根据CommitLog构建对应的ConsumeQueue,IndexFile文件
10.HAService 存储HA机制
11.MessageArrivingListener, 消息拉去长轮询模式达到监听器
12.BrokerConfig brokerConfig的配置
13.StoreCheckpoint 文件刷盘检测点
14.LinkedList dispatcherList 转发请求列表
整体流程为
整体消息存储的入口为DefaultMessageStore#putMessage,我们也是拿着同步存储进行举例的
函数中,首先进行校验
private PutMessageStatus checkStoreStatus() {
//如果已经关闭了 if (this.shutdown) { log.warn(“message store has shutdown, so putMessage is forbidden”); return PutMessageStatus.SERVICE_NOT_AVAILABLE; } //如果为Slave角色 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn(“broke role is slave, so putMessage is forbidden”); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } //如果不支持写入 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn(“the message store is not writable. It may be caused by one of the following reasons: ” + “the broker’s disk is full, write to logic queue error, write to index file error, etc”); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } else { this.printTimes.set(0); } if (this.isOSPageCacheBusy()) { return PutMessageStatus.OS_PAGECACHE_BUSY; } return PutMessageStatus.PUT_OK; } |
然后是检测消息本身是否符合长度相关
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
//如果topic名字大于127 if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn(“putMessage message topic length too long ” + msg.getTopic().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } //如果属性值大于32767个字符 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn(“putMessage message properties length too long ” + msg.getPropertiesString().length()); return PutMessageStatus.MESSAGE_ILLEGAL; } return PutMessageStatus.PUT_OK; } |
然后进入commitLog#putMessage中
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) //存入现在的校验值,方便后续的计算,进行一致性的比较,使用的是常用的crc校验方式 msg.setBodyCRC(UtilAll.crc32(msg.getBody())); |
然后我们获取到是不是延迟消息
如果是DelayTimeLevel大于0的话,那么现将topic设置为延迟消息主题
//设置topic为延迟消息主题
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
然后将原本的消息队列存储起来
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
获取到可以写入的CommitLog文件,从mappendFileQueue中获取,本质上就是对应的文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null; while (!this.mappedFiles.isEmpty()) { try { mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() – 1); break; } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error(“getLastMappedFile has exception.”, e); break; } } return mappedFileLast; } |
在这一段中,获取到最后一个可以写入的commitLog
这样获取的原理在于
commitlog目录中,每一个文件默认大小为1G,以文件的第一个偏移量为文件名,不足的以0对其,第一个文件的文件名为0,第二个文件名为1073741824,也是方便快速的通过文件偏移量获取到对应的消息
继续走主流程
写入之前,申请putMessageLock,而且是阻塞式的获取锁
然后检测并尝试创建mappendFile
//为空,或者满了
if (null == mappedFile || mappedFile.isFull()) { //直接去创建第一个文件 mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } //还是为空,说明是创建失败,抛出CREATE MAPEDFILE FAILED 磁盘不够或者权限不足 if (null == mappedFile) { log.error(“create mapped file1 error, topic: ” + msg.getTopic() + ” clientAddr: ” + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } |
接下来,MappendFile已经创建出来了,那么追加到mappendFile中
走MappendFile的appendMessagesInner函数
//获取到写指针
int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { //…… } // 走到这一步,说明文件已经写满了,抛出一个AppendMessageStatus.UNKOWN_ERRPR log.error(“MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}”, currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); |
如果文件写满了,那么就会抛出一个AppendMessageStatus.UNKOWN_ERROR
如果小于的话,那么
先利用slice获取到一个ByteBuffer
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
设置对应的position为当前指针
然后走下一步操作,进行添加,生成result返回
走到doAppend函数中
进行一些预热的操作
首先是加上对应的文件大小,然后创建全局唯一的消息Id,
由 4字节的IP 4字节的端口号 以及8字节的消息偏移量构成
然后获取到对应的偏移量,并尝试写入
计算消息的总长度,获取到属性长度,获取到到topic长度,消息体长度
在calMsgLength中,进行计算消息体长度
根据定长的和不定长的组成对应的消息体长度,总共大小不会超过4个字节
获取到了消息长度之后,我们进行比对
如果长度大于了最大的消息长度,报错
如果消息体长度加上当前长度大于了文件的最大长度,那么会考虑创建一个新的commitLog文件来进行存储
接下来进行实际的存入
一步步的存入实际的值,诸如msgLen,消息体,topic等
生成返回值进行返回
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); //生成返回值 AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() – beginTimeMills); |
但是这时候,只是将内容存在了ByteBuffer中,并没有输盘
是的这里面还有buffer的刷盘过程
那么在最后,我们说一下org.apache.rocketmq.store.AppendMessageResult
内部含有的属性如下
AppendMessageStatus 消息结果,追加成功就是PUT_OK
long wroteOffset 消息的物理偏移量
String msgId 消息Id
long storeTimestamp消息时间戳
int msgNum = 1 消息条数,批量消息发送的时候消息条数
那么到这里,就基本返回函数了
别忘了解锁的操作
然后生成返回丢向
进行真正的刷盘工作
handleDiskFlush(result,putMessageResult,msg)
输盘完成之后执行HA主从同步复制