我们主要说一下消息发送之前的前置流程的步骤
我们首先从消息发送的入口开始看
上文我们说了DefaultMQProducer的作用,作为一个委托者,内部委托给了对应的DefaultMQProducerImpl
那么入口必然在DefaultMQProducer中存在,
@Override
public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Validators.checkMessage(msg, this); msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg); } |
调用对应的defaultMQProducerImpl
我们在send函数中,采用的同步发送,因此,我们的超时时间默认是3s
在DefaultMQProducer中的send中,我们首先调用进行消息长度校验
Validators.checkMessage(msg, this);
不能为0且默认不能超过最大长度,在DefaultMQProducer中的属性 4M
然后进行了本身的校验之后
我们需要查找主题路由信息
我们利用主题路由信息来将消息发送到对应的Broker
对应的路由查找入口在
DefaultMQProducerImpl中的
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); |
在对应的tryToFindTopicPublishInfo中是查找路由信息的方法,如果生产者中缓冲了topic的路由信息,路由信息中包含了消息队列,直接返回该路由信息,没有缓存则包含消息队列,则向NameServer查询该topic的路由信息,最终没有找到路由信息,抛出异常,那么我们所需要的路由信息就在TopicPublishInfo中
我们看一下TopicPublishInfo的数据结构
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; |
对应的属性有
orderTopic,是否是顺序消息
List<MessageQueue> messageQueueList 这个主题队列的消息队列
sendWhichQueue 没选择一次消息队列,这个值加一
然后是TopicRouteData的数据结构
private String orderTopicConf;
private List<QueueData> queueDatas; private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; |
queueData,topic队列元数据
brokerDatas topic分布的broker元数据
filterServerTable 过滤服务器地址列表
那么我们先从第一次start后,没有缓存topic路由信息开始,
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//尝试获取topic路由信息的api TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //尝试从NameServer更新这个topic的信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //不熊,就用默认的createKey更新路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } } |
分别使用topic和默认createKey去查询NameServer进行更新
如何查询NameServer,获取路由信息
if (isDefault && defaultMQProducer != null) {
//如果为true且routeManager不为空,那么使用默认主题查询整体路由信息 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { //利用topic去查,使用默认主题去查询,查询到了路由信息,获取到路由信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { //不为空,进行比较 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); //相同返回false if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info(“the topic[{}] route info changed, old[{}] ,new[{}]”, topic, old, topicRouteData); } if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { //更新这个MQClientInstance所管辖的关于topic的路由信息 impl.updateTopicPublishInfo(topic, publishInfo); } } } |
上面获取到了topicRouteData之后,要转换为topicPublishInfo的List<MessageQueue> 列表
转换的实现,在topicRouteData2TopicPublishInfo函数中,
最后更新MQInstance管辖的Topic路由信息,完成整体的路由查找
最后,在发送之前,我们要说的最后一件事,就是关于发送时候的消息队列选择
我们在上面更新获取了TopicPublishInfo了
上面代码中,如果topicA在broker-a/b上各有4个队列
获取的消息队列会转换出
{brokerName: “broker-a” , queueId : “0”}
{brokerName: “broker-a” , queueId : “1”}
{brokerName: “broker-a” , queueId : “2”}
{brokerName: “broker-a” , queueId : “3”}
如何采用了重试机制,同步会进行重试
异步会在执行回调之前进行重试
我们看一下默认对应的队列选择
首先是在Send的主函数中,我们调用函数获取一个可用的队列
//获取到一个实际可以发送的MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); |
一路到了MQFaultStrategy中的selectOneMessageQueue函数中,
根据sendLatencyFaultEnable
这个成员变量,来决定是否启用Broker故障延迟机制
如果不启用Borker机制,直接调用topicInfo自带的selectOneMessageQueue来进行获取
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//如果没有失败过,就直接走正常流程 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { //因为是轮询,所以会获得下一个 int index = this.sendWhichQueue.getAndIncrement(); //别ArrayIndexException了 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; //直接获取返回 return this.messageQueueList.get(pos); } |
这样就能简单的规避一下故障的Broker
接下来是故障延迟的机制
故障延迟用于选择一个队列发送时候有用
//如果开启了故障延迟机制
if (this.sendLatencyFaultEnable) { try { //获取到下一个messageQueue int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //判断是否可用,从一个table中获取到,检测是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //选出一个不那么好的,但是基本可用的,如果没有返回,就是个null final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error(“Error occurred when selecting message queue”, e); } return tpInfo.selectOneMessageQueue(); } |
获取是否开启故障延迟
然后获取到一个消息队列,然后检测这个消息队列是否可用,如果返回的消息队列可用,就移除对应的topic条目,说明可用
那么这个isAvailable的状态位是从哪里来的呢?
就利用了对应的update函数,位于LatencyFaultToleranceImpl中
//更新brokerName对应的状态
@Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); //设置不可用的时间 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } |
检测的时候,就检测这个StartTimestamp
那么什么地方进行的更新呢?就是在DefaultMQProducer中进行更新
如果在发送的过程中,出现了异常
或者是每次发送完成了
但是每次发送完成了,调用函数传入的是flase
这个参数如果是true,会使用30s作为computeNotAvailableDuration方法的参数
如果是flase,那么会使用时延作为参数
这个时间就是接下来不接受评比的时间
这里需要注意的是
currentLatency startTimeStamp是被volatile修饰的
startTimeStamp是当前系统时间加上需要规避的时长,startTimeStamp是判断broker当前可用的直接依据
说完了如何选择消息队列之后
我们来说一下消息的发送