那么实际的发送在DefaultMQProducerImpl中

//真正的消息发送API的入口

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout – costTime);

在这个函数中

基本如下

//尝试从本地获取到brokerAddr

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

if (null == brokerAddr) {

//从NameServer获取一下信息

tryToFindTopicPublishInfo(mq.getTopic());

brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

}

然后如果不是批消息,则会进行分配全局唯一Id

try {

//如果不属于批消息

if (!(msg instanceof MessageBatch)) {

//分配全局唯一ID

MessageClientIDSetter.setUniqID(msg);

}

接下来,判断大小,判断事务

//判断是否大于4K

if (this.tryToCompressMessage(msg)) {

//压缩

sysFlag |= MessageSysFlag.COMPRESSED_FLAG;

msgBodyCompressed = true;

}

//如果是事务Prepared消息,那么消息的系统标记为TRANSACTION_PREPARED_TYPE

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {

sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;

}

接下来,设计一个责任链模式

Hook的存在于一个链中

如果出现可以处理的,则进行处理

就是发送消息之前进行增强逻辑

//下面两个判断,都是检测Hook的存在

if (hasCheckForbiddenHook()) {

CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();

checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());

checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());

checkForbiddenContext.setCommunicationMode(communicationMode);

checkForbiddenContext.setBrokerAddr(brokerAddr);

checkForbiddenContext.setMessage(msg);

checkForbiddenContext.setMq(mq);

checkForbiddenContext.setUnitMode(this.isUnitMode());

this.executeCheckForbiddenHook(checkForbiddenContext);

}

//消息发送Hook的存在

if (this.hasSendMessageHook()) {

context = new SendMessageContext();

context.setProducer(this);

context.setProducerGroup(this.defaultMQProducer.getProducerGroup());

context.setCommunicationMode(communicationMode);

context.setBornHost(this.defaultMQProducer.getClientIP());

context.setBrokerAddr(brokerAddr);

context.setMessage(msg);

context.setMq(mq);

context.setNamespace(this.defaultMQProducer.getNamespace());

String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (isTrans != null && isTrans.equals(“true”)) {

context.setMsgType(MessageType.Trans_Msg_Half);

}

对应注册的逻辑是在MQProducerImpl的registerSendMessageHook

对应的Hook接口,则是如下

public interface SendMessageHook {

String hookName();

void sendMessageBefore(final SendMessageContext context);

void sendMessageAfter(final SendMessageContext context);

}

然后构建相关的请求包

//组装请求头

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

//设置生产者组

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

//设置对应的主题

requestHeader.setTopic(msg.getTopic());

//设置默认的主题,createTopicKey

requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());

//默认队列数

requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setSysFlag(sysFlag);

requestHeader.setBornTimestamp(System.currentTimeMillis());

//消息系统标记

requestHeader.setFlag(msg.getFlag());

//负责构建属性

requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

//重试次数

requestHeader.setReconsumeTimes(0);

requestHeader.setUnitMode(this.isUnitMode());

//是否批消息

requestHeader.setBatch(msg instanceof MessageBatch);

if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);

if (reconsumeTimes != null) {

//看是不是设置过了重试次数

requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));

MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);

}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);

if (maxReconsumeTimes != null) {

requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));

MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);

}

}

然后进行发送

这一步发送,则是和传参中的communicationMode有关

分别是同步 异步 单向发送

当然,在默认实现中,是只支持同步发送的

switch (communicationMode) {

case ASYNC:

Message tmpMessage = msg;

boolean messageCloned = false;

if (msgBodyCompressed) {

//If msg body was compressed, msgbody should be reset using prevBody.

//Clone new message using commpressed message body and recover origin massage.

//Fix bug:https://github.com/apache/rocketmq-externals/issues/66

tmpMessage = MessageAccessor.cloneMessage(msg);

messageCloned = true;

msg.setBody(prevBody);

}

if (topicWithNamespace) {

if (!messageCloned) {

tmpMessage = MessageAccessor.cloneMessage(msg);

messageCloned = true;

}

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));

}

long costTimeAsync = System.currentTimeMillis() – beginStartTime;

if (timeout < costTimeAsync) {

throw new RemotingTooMuchRequestException(“sendKernelImpl call timeout”);

}

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

brokerAddr,

mq.getBrokerName(),

tmpMessage,

requestHeader,

timeout – costTimeAsync,

communicationMode,

sendCallback,

topicPublishInfo,

this.mQClientFactory,

this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),

context,

this);

break;

case ONEWAY:

case SYNC:

long costTimeSync = System.currentTimeMillis() – beginStartTime;

if (timeout < costTimeSync) {

throw new RemotingTooMuchRequestException(“sendKernelImpl call timeout”);

}

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

brokerAddr,

mq.getBrokerName(),

msg,

requestHeader,

timeout – costTimeSync,

communicationMode,

context,

this);

break;

default:

assert false;

break;

}

分为了异步同步 单向的发送

然后是执行之后,还会利用钩子进行后置的增强

if (this.hasSendMessageHook()) {

context.setSendResult(sendResult);

this.executeSendMessageHookAfter(context);

}

具体的同步发送流程为,我们拿最新的代码进行测试

在发送中,如果是同步,我们的调用栈如下

DefaultMQProducerImpl的 send中

到MQClientAPIImpl中的 sendMessage

然后一直到sendMessageSync

后来调用remote客户端中的同步调用

最后一路到达NettyRemotingClinet中的invokeSync中

如果是异步发送

基本流程是指在消息发送者调用发送的API之后,无须等待消息服务器返回本次消息结果,只需要提供一个回调函数,供其回调即可,消息发送的性能有了显著提高,但是为了保护消息服务器的负载压力,RMQ对异步消息有了并发的控制,利用参数clientAsyncSemaphoreValue来进行控制,默认为635535,异步发送消息可以利用DefaultMQProducet的retryTimesWhenSendAsyncFailed属性来控制消息重试次数的

但是对于重试,则是在服务器端收到响应包之后才会进行的,如果出现了网络错误,网络超时则不会重试

单向发送

就是消息发送者发送消息API后,不会等待消息返回本次消息成功与否,而且没有重试机制

最后是批量发送数据

打包成为同一批次进行发送,但是并不是同一批次的消息次数越多就性能越好,因为判断一句是单条消息的长度,单条长度不能超过DefaultMQProducer的maxMessageSize 也就是4M

那么RMQ如何编码多条消息的呢?我们可以查看一下对应的类图

首先是创建RemotingCommand

if (sendSmartMsg || msg instanceof MessageBatch) {

SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);

request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);

我们实际在RemotingCommand的createRequestCommand中创建的

那么我们先看一下RemotingCommand中的属性

图片

code 请求命令编码,请求命令类型

version 版本号

opaque 客户单请求序号

flag 标记 倒数第一位表示请求类型 0 为请求,1 为返回,倒数第二位: 1表示oneway

remark是标记

extFields 扩展属性

customeHeader 每个请求对应的请求头信息

body 请求体

对于批量请求,需要將多条的内容存储在body中,如何正确的解析出每条消息呢?

RMQ的解决方式是,对单个消息内容使用固定格式进行存储

基本的处理方式在

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

MessageBatch msgBatch;

try {

msgBatch = MessageBatch.generateFromList(msgs);

for (Message message : msgBatch) {

Validators.checkMessage(message, this);

MessageClientIDSetter.setUniqID(message);

message.setTopic(withNamespace(message.getTopic()));

}

msgBatch.setBody(msgBatch.encode());

} catch (Exception e) {

throw new MQClientException(“Failed to initiate the MessageBatch”, e);

}

msgBatch.setTopic(withNamespace(msgBatch.getTopic()));

return msgBatch;

}

统一的聚合为一个MessageBatch,Btach内部持有List<Message> messages

对于的encode()函数方式如下

public static byte[] encodeMessages(List<Message> messages) {

//TO DO refactor, accumulate in one buffer, avoid copies

//一个byte的列表

List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());

int allSize = 0;

for (Message message : messages) {

byte[] tmp = encodeMessage(message);

encodedMessages.add(tmp);

//维护总量

allSize += tmp.length;

}

//总的一个byte

byte[] allBytes = new byte[allSize];

int pos = 0;

for (byte[] bytes : encodedMessages) {

System.arraycopy(bytes, 0, allBytes, pos, bytes.length);

pos += bytes.length;

}

return allBytes;

}

将其聚合为一个byte[]数组,对应的服务器端可以解析即可

最后在sendMessage函数中发送即可

request.setBody(msg.getBody());

发表评论

邮箱地址不会被公开。 必填项已用*标注