RMQ支持三种消息发送方式, 同步 异步 单向
同步的时候,发送消息,阻塞等待,直到消息服务器返回发送结果
异步的时候,并不阻塞,而是在成功后执行对应的回调函数,回调函数在一个新的线程中执行
单向,发送消息API后直接返回,并不关心消息是否发送成功
那么我们说一下RMQ的消息
RMQ的消息主要包含主题topic,消息Flag,属性,消息体
Message被封装为了
package org.apache.rocketmq.common.message;
主要的属性有 topic 消息Flag 扩展属性 消息体
RocketMQ定义的MessageFlag如下所示
public class MessageSysFlag {
public final static int COMPRESSED_FLAG = 0x1; public final static int MULTI_TAGS_FLAG = 0x1 << 1; public final static int TRANSACTION_NOT_TYPE = 0; public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; public final static int BORNHOST_V6_FLAG = 0x1 << 4; public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5; |
基本上都是基于位运算的Flag
创建Message的构造函数
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
//主题 this.topic = topic; //Flag this.flag = flag; //主题 this.body = body; //设置属性,带有Tags if (tags != null && tags.length() > 0) this.setTags(tags); //设置属性,带有keys if (keys != null && keys.length() > 0) this.setKeys(keys); //是否等待消息存储完成 this.setWaitStoreMsgOK(waitStoreMsgOK); //此上三者,都存储在Message的properties中,一个HashMap中 } |