那么实际的发送在DefaultMQProducerImpl中
//真正的消息发送API的入口
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout – costTime); |
在这个函数中
基本如下
//尝试从本地获取到brokerAddr
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { //从NameServer获取一下信息 tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } |
然后如果不是批消息,则会进行分配全局唯一Id
try {
//如果不属于批消息 if (!(msg instanceof MessageBatch)) { //分配全局唯一ID MessageClientIDSetter.setUniqID(msg); } |
接下来,判断大小,判断事务
//判断是否大于4K
if (this.tryToCompressMessage(msg)) { //压缩 sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } //如果是事务Prepared消息,那么消息的系统标记为TRANSACTION_PREPARED_TYPE final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } |
接下来,设计一个责任链模式
Hook的存在于一个链中
如果出现可以处理的,则进行处理
就是发送消息之前进行增强逻辑
//下面两个判断,都是检测Hook的存在
if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } //消息发送Hook的存在 if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals(“true”)) { context.setMsgType(MessageType.Trans_Msg_Half); } |
对应注册的逻辑是在MQProducerImpl的registerSendMessageHook
对应的Hook接口,则是如下
public interface SendMessageHook {
String hookName(); void sendMessageBefore(final SendMessageContext context); void sendMessageAfter(final SendMessageContext context); } |
然后构建相关的请求包
//组装请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); //设置生产者组 requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //设置对应的主题 requestHeader.setTopic(msg.getTopic()); //设置默认的主题,createTopicKey requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); //默认队列数 requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); //消息系统标记 requestHeader.setFlag(msg.getFlag()); //负责构建属性 requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); //重试次数 requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); //是否批消息 requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { //看是不是设置过了重试次数 requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } |
然后进行发送
这一步发送,则是和传参中的communicationMode有关
分别是同步 异步 单向发送
当然,在默认实现中,是只支持同步发送的
switch (communicationMode) {
case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() – beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException(“sendKernelImpl call timeout”); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout – costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() – beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException(“sendKernelImpl call timeout”); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout – costTimeSync, communicationMode, context, this); break; default: assert false; break; } |
分为了异步同步 单向的发送
然后是执行之后,还会利用钩子进行后置的增强
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } |
具体的同步发送流程为,我们拿最新的代码进行测试
在发送中,如果是同步,我们的调用栈如下
DefaultMQProducerImpl的 send中
到MQClientAPIImpl中的 sendMessage
然后一直到sendMessageSync
后来调用remote客户端中的同步调用
最后一路到达NettyRemotingClinet中的invokeSync中
如果是异步发送
基本流程是指在消息发送者调用发送的API之后,无须等待消息服务器返回本次消息结果,只需要提供一个回调函数,供其回调即可,消息发送的性能有了显著提高,但是为了保护消息服务器的负载压力,RMQ对异步消息有了并发的控制,利用参数clientAsyncSemaphoreValue来进行控制,默认为635535,异步发送消息可以利用DefaultMQProducet的retryTimesWhenSendAsyncFailed属性来控制消息重试次数的
但是对于重试,则是在服务器端收到响应包之后才会进行的,如果出现了网络错误,网络超时则不会重试
单向发送
就是消息发送者发送消息API后,不会等待消息返回本次消息成功与否,而且没有重试机制
最后是批量发送数据
打包成为同一批次进行发送,但是并不是同一批次的消息次数越多就性能越好,因为判断一句是单条消息的长度,单条长度不能超过DefaultMQProducer的maxMessageSize 也就是4M
那么RMQ如何编码多条消息的呢?我们可以查看一下对应的类图
首先是创建RemotingCommand
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); |
我们实际在RemotingCommand的createRequestCommand中创建的
那么我们先看一下RemotingCommand中的属性
code 请求命令编码,请求命令类型
version 版本号
opaque 客户单请求序号
flag 标记 倒数第一位表示请求类型 0 为请求,1 为返回,倒数第二位: 1表示oneway
remark是标记
extFields 扩展属性
customeHeader 每个请求对应的请求头信息
body 请求体
对于批量请求,需要將多条的内容存储在body中,如何正确的解析出每条消息呢?
RMQ的解决方式是,对单个消息内容使用固定格式进行存储
基本的处理方式在
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch; try { msgBatch = MessageBatch.generateFromList(msgs); for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); message.setTopic(withNamespace(message.getTopic())); } msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException(“Failed to initiate the MessageBatch”, e); } msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch; } |
统一的聚合为一个MessageBatch,Btach内部持有List<Message> messages
对于的encode()函数方式如下
public static byte[] encodeMessages(List<Message> messages) {
//TO DO refactor, accumulate in one buffer, avoid copies //一个byte的列表 List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size()); int allSize = 0; for (Message message : messages) { byte[] tmp = encodeMessage(message); encodedMessages.add(tmp); //维护总量 allSize += tmp.length; } //总的一个byte byte[] allBytes = new byte[allSize]; int pos = 0; for (byte[] bytes : encodedMessages) { System.arraycopy(bytes, 0, allBytes, pos, bytes.length); pos += bytes.length; } return allBytes; } |
将其聚合为一个byte[]数组,对应的服务器端可以解析即可
最后在sendMessage函数中发送即可
request.setBody(msg.getBody()); |