这次我们先整体梳理下消息存储流程

消息存储实现类是 org.apache.rocketmq.store.DefaultMessageStore

是核心类库,存储了大量的对其他文件操作的api

核心属性包括:

1.MessageStoreConfig messageStoreConfig 消息存储配置属性

2.CommitLog commitLog文件的实现类

3.ConcurrentMap<String,ConcurrentMap<Integer,ConsumeQueue>> consumeQueueTable 消息队列存储缓存表,按照消息主题分组

4.FlushConsumeQueueService 刷盘线程

5.CleanCommitLogService 清除CommitLog文件服务

6.CleanCommitQueueService 清除ConsumeQueue文件服务

7.IndexService 索引文件实现类

8.AllocateMappendFileService MappendFile分配服务

9.ReputMessageService 消息分发,根据CommitLog构建对应的ConsumeQueue,IndexFile文件

10.HAService 存储HA机制

11.MessageArrivingListener, 消息拉去长轮询模式达到监听器

12.BrokerConfig brokerConfig的配置

13.StoreCheckpoint 文件刷盘检测点

14.LinkedList dispatcherList 转发请求列表

整体流程为

整体消息存储的入口为DefaultMessageStore#putMessage,我们也是拿着同步存储进行举例的

函数中,首先进行校验

private PutMessageStatus checkStoreStatus() {

//如果已经关闭了

if (this.shutdown) {

log.warn(“message store has shutdown, so putMessage is forbidden”);

return PutMessageStatus.SERVICE_NOT_AVAILABLE;

}

//如果为Slave角色

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {

long value = this.printTimes.getAndIncrement();

if ((value % 50000) == 0) {

log.warn(“broke role is slave, so putMessage is forbidden”);

}

return PutMessageStatus.SERVICE_NOT_AVAILABLE;

}

//如果不支持写入

if (!this.runningFlags.isWriteable()) {

long value = this.printTimes.getAndIncrement();

if ((value % 50000) == 0) {

log.warn(“the message store is not writable. It may be caused by one of the following reasons: ” +

“the broker’s disk is full, write to logic queue error, write to index file error, etc”);

}

return PutMessageStatus.SERVICE_NOT_AVAILABLE;

} else {

this.printTimes.set(0);

}

if (this.isOSPageCacheBusy()) {

return PutMessageStatus.OS_PAGECACHE_BUSY;

}

return PutMessageStatus.PUT_OK;

}

然后是检测消息本身是否符合长度相关

private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {

//如果topic名字大于127

if (msg.getTopic().length() > Byte.MAX_VALUE) {

log.warn(“putMessage message topic length too long ” + msg.getTopic().length());

return PutMessageStatus.MESSAGE_ILLEGAL;

}

//如果属性值大于32767个字符

if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {

log.warn(“putMessage message properties length too long ” + msg.getPropertiesString().length());

return PutMessageStatus.MESSAGE_ILLEGAL;

}

return PutMessageStatus.PUT_OK;

}

然后进入commitLog#putMessage中

// Set the storage time

msg.setStoreTimestamp(System.currentTimeMillis());

// Set the message body BODY CRC (consider the most appropriate setting

// on the client)

//存入现在的校验值,方便后续的计算,进行一致性的比较,使用的是常用的crc校验方式

msg.setBodyCRC(UtilAll.crc32(msg.getBody()));

然后我们获取到是不是延迟消息

如果是DelayTimeLevel大于0的话,那么现将topic设置为延迟消息主题

//设置topic为延迟消息主题

topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;

然后将原本的消息队列存储起来

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));

msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

获取到可以写入的CommitLog文件,从mappendFileQueue中获取,本质上就是对应的文件

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

public MappedFile getLastMappedFile() {

MappedFile mappedFileLast = null;

while (!this.mappedFiles.isEmpty()) {

try {

mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() – 1);

break;

} catch (IndexOutOfBoundsException e) {

//continue;

} catch (Exception e) {

log.error(“getLastMappedFile has exception.”, e);

break;

}

}

return mappedFileLast;

}

在这一段中,获取到最后一个可以写入的commitLog

这样获取的原理在于

commitlog目录中,每一个文件默认大小为1G,以文件的第一个偏移量为文件名,不足的以0对其,第一个文件的文件名为0,第二个文件名为1073741824,也是方便快速的通过文件偏移量获取到对应的消息

继续走主流程

写入之前,申请putMessageLock,而且是阻塞式的获取锁

然后检测并尝试创建mappendFile

//为空,或者满了

if (null == mappedFile || mappedFile.isFull()) {

//直接去创建第一个文件

mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise

}

//还是为空,说明是创建失败,抛出CREATE MAPEDFILE FAILED 磁盘不够或者权限不足

if (null == mappedFile) {

log.error(“create mapped file1 error, topic: ” + msg.getTopic() + ” clientAddr: ” + msg.getBornHostString());

beginTimeInLock = 0;

return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);

}

接下来,MappendFile已经创建出来了,那么追加到mappendFile中

走MappendFile的appendMessagesInner函数

//获取到写指针

int currentPos = this.wrotePosition.get();

if (currentPos < this.fileSize) {

//……

}

// 走到这一步,说明文件已经写满了,抛出一个AppendMessageStatus.UNKOWN_ERRPR

log.error(“MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}”, currentPos, this.fileSize);

return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);

如果文件写满了,那么就会抛出一个AppendMessageStatus.UNKOWN_ERROR

如果小于的话,那么

先利用slice获取到一个ByteBuffer

ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();

设置对应的position为当前指针

然后走下一步操作,进行添加,生成result返回

走到doAppend函数中

进行一些预热的操作

首先是加上对应的文件大小,然后创建全局唯一的消息Id,

由 4字节的IP 4字节的端口号 以及8字节的消息偏移量构成

然后获取到对应的偏移量,并尝试写入

计算消息的总长度,获取到属性长度,获取到到topic长度,消息体长度

在calMsgLength中,进行计算消息体长度

图片

根据定长的和不定长的组成对应的消息体长度,总共大小不会超过4个字节

获取到了消息长度之后,我们进行比对

如果长度大于了最大的消息长度,报错

如果消息体长度加上当前长度大于了文件的最大长度,那么会考虑创建一个新的commitLog文件来进行存储

接下来进行实际的存入

一步步的存入实际的值,诸如msgLen,消息体,topic等

生成返回值进行返回

// Initialization of storage space

this.resetByteBuffer(msgStoreItemMemory, msgLen);

// 1 TOTALSIZE

this.msgStoreItemMemory.putInt(msgLen);

// 2 MAGICCODE

this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);

// 3 BODYCRC

this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());

// 4 QUEUEID

this.msgStoreItemMemory.putInt(msgInner.getQueueId());

// 5 FLAG

this.msgStoreItemMemory.putInt(msgInner.getFlag());

// 6 QUEUEOFFSET

this.msgStoreItemMemory.putLong(queueOffset);

// 7 PHYSICALOFFSET

this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());

// 8 SYSFLAG

this.msgStoreItemMemory.putInt(msgInner.getSysFlag());

// 9 BORNTIMESTAMP

this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());

// 10 BORNHOST

this.resetByteBuffer(bornHostHolder, bornHostLength);

this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));

// 11 STORETIMESTAMP

this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());

// 12 STOREHOSTADDRESS

this.resetByteBuffer(storeHostHolder, storeHostLength);

this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));

// 13 RECONSUMETIMES

this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());

// 14 Prepared Transaction Offset

this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());

// 15 BODY

this.msgStoreItemMemory.putInt(bodyLength);

if (bodyLength > 0)

this.msgStoreItemMemory.put(msgInner.getBody());

// 16 TOPIC

this.msgStoreItemMemory.put((byte) topicLength);

this.msgStoreItemMemory.put(topicData);

// 17 PROPERTIES

this.msgStoreItemMemory.putShort((short) propertiesLength);

if (propertiesLength > 0)

this.msgStoreItemMemory.put(propertiesData);

final long beginTimeMills = CommitLog.this.defaultMessageStore.now();

// Write messages to the queue buffer

byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

//生成返回值

AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,

msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() – beginTimeMills);

但是这时候,只是将内容存在了ByteBuffer中,并没有输盘

是的这里面还有buffer的刷盘过程

那么在最后,我们说一下org.apache.rocketmq.store.AppendMessageResult

内部含有的属性如下

AppendMessageStatus 消息结果,追加成功就是PUT_OK

long wroteOffset 消息的物理偏移量

String msgId 消息Id

long storeTimestamp消息时间戳

int msgNum = 1 消息条数,批量消息发送的时候消息条数

那么到这里,就基本返回函数了

别忘了解锁的操作

然后生成返回丢向

进行真正的刷盘工作

handleDiskFlush(result,putMessageResult,msg)

输盘完成之后执行HA主从同步复制

发表评论

邮箱地址不会被公开。 必填项已用*标注