我们主要说一下消息发送之前的前置流程的步骤

我们首先从消息发送的入口开始看

上文我们说了DefaultMQProducer的作用,作为一个委托者,内部委托给了对应的DefaultMQProducerImpl

那么入口必然在DefaultMQProducer中存在,

@Override

public SendResult send(

Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

Validators.checkMessage(msg, this);

msg.setTopic(withNamespace(msg.getTopic()));

return this.defaultMQProducerImpl.send(msg);

}

调用对应的defaultMQProducerImpl

我们在send函数中,采用的同步发送,因此,我们的超时时间默认是3s

在DefaultMQProducer中的send中,我们首先调用进行消息长度校验

Validators.checkMessage(msg, this);

不能为0且默认不能超过最大长度,在DefaultMQProducer中的属性 4M

然后进行了本身的校验之后

我们需要查找主题路由信息

我们利用主题路由信息来将消息发送到对应的Broker

对应的路由查找入口在

DefaultMQProducerImpl中的

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

在对应的tryToFindTopicPublishInfo中是查找路由信息的方法,如果生产者中缓冲了topic的路由信息,路由信息中包含了消息队列,直接返回该路由信息,没有缓存则包含消息队列,则向NameServer查询该topic的路由信息,最终没有找到路由信息,抛出异常,那么我们所需要的路由信息就在TopicPublishInfo中

我们看一下TopicPublishInfo的数据结构

private boolean orderTopic = false;

private boolean haveTopicRouterInfo = false;

private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();

private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

private TopicRouteData topicRouteData;

对应的属性有

orderTopic,是否是顺序消息

List<MessageQueue> messageQueueList 这个主题队列的消息队列

sendWhichQueue 没选择一次消息队列,这个值加一

然后是TopicRouteData的数据结构

private String orderTopicConf;

private List<QueueData> queueDatas;

private List<BrokerData> brokerDatas;

private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

queueData,topic队列元数据

brokerDatas topic分布的broker元数据

filterServerTable 过滤服务器地址列表

那么我们先从第一次start后,没有缓存topic路由信息开始,

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {

//尝试获取topic路由信息的api

TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

if (null == topicPublishInfo || !topicPublishInfo.ok()) {

this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());

//尝试从NameServer更新这个topic的信息

this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);

topicPublishInfo = this.topicPublishInfoTable.get(topic);

}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {

return topicPublishInfo;

} else {

//不熊,就用默认的createKey更新路由信息

this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);

topicPublishInfo = this.topicPublishInfoTable.get(topic);

return topicPublishInfo;

}

}

分别使用topic和默认createKey去查询NameServer进行更新

如何查询NameServer,获取路由信息

if (isDefault && defaultMQProducer != null) {

//如果为true且routeManager不为空,那么使用默认主题查询整体路由信息

topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),

1000 * 3);

if (topicRouteData != null) {

for (QueueData data : topicRouteData.getQueueDatas()) {

int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());

data.setReadQueueNums(queueNums);

data.setWriteQueueNums(queueNums);

}

}

} else {

//利用topic去查,使用默认主题去查询,查询到了路由信息,获取到路由信息

topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);

}

if (topicRouteData != null) {

//不为空,进行比较

TopicRouteData old = this.topicRouteTable.get(topic);

boolean changed = topicRouteDataIsChange(old, topicRouteData);

//相同返回false

if (!changed) {

changed = this.isNeedUpdateTopicRouteInfo(topic);

} else {

log.info(“the topic[{}] route info changed, old[{}] ,new[{}]”, topic, old, topicRouteData);

}

if (changed) {

TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {

this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());

}

// Update Pub info

{

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);

publishInfo.setHaveTopicRouterInfo(true);

Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, MQProducerInner> entry = it.next();

MQProducerInner impl = entry.getValue();

if (impl != null) {

//更新这个MQClientInstance所管辖的关于topic的路由信息

impl.updateTopicPublishInfo(topic, publishInfo);

}

}

}

上面获取到了topicRouteData之后,要转换为topicPublishInfo的List<MessageQueue> 列表

转换的实现,在topicRouteData2TopicPublishInfo函数中,

最后更新MQInstance管辖的Topic路由信息,完成整体的路由查找

最后,在发送之前,我们要说的最后一件事,就是关于发送时候的消息队列选择

我们在上面更新获取了TopicPublishInfo了

上面代码中,如果topicA在broker-a/b上各有4个队列

获取的消息队列会转换出

{brokerName: “broker-a” , queueId : “0”}

{brokerName: “broker-a” , queueId : “1”}

{brokerName: “broker-a” , queueId : “2”}

{brokerName: “broker-a” , queueId : “3”}

如何采用了重试机制,同步会进行重试

异步会在执行回调之前进行重试

我们看一下默认对应的队列选择

首先是在Send的主函数中,我们调用函数获取一个可用的队列

//获取到一个实际可以发送的MessageQueue

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

一路到了MQFaultStrategy中的selectOneMessageQueue函数中,

根据sendLatencyFaultEnable

这个成员变量,来决定是否启用Broker故障延迟机制

如果不启用Borker机制,直接调用topicInfo自带的selectOneMessageQueue来进行获取

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {

//如果没有失败过,就直接走正常流程

if (lastBrokerName == null) {

return selectOneMessageQueue();

} else {

for (int i = 0; i < this.messageQueueList.size(); i++) {

int index = this.sendWhichQueue.getAndIncrement();

int pos = Math.abs(index) % this.messageQueueList.size();

if (pos < 0)

pos = 0;

MessageQueue mq = this.messageQueueList.get(pos);

if (!mq.getBrokerName().equals(lastBrokerName)) {

return mq;

}

}

return selectOneMessageQueue();

}

}

public MessageQueue selectOneMessageQueue() {

//因为是轮询,所以会获得下一个

int index = this.sendWhichQueue.getAndIncrement();

//别ArrayIndexException了

int pos = Math.abs(index) % this.messageQueueList.size();

if (pos < 0)

pos = 0;

//直接获取返回

return this.messageQueueList.get(pos);

}

这样就能简单的规避一下故障的Broker

接下来是故障延迟的机制

故障延迟用于选择一个队列发送时候有用

//如果开启了故障延迟机制

if (this.sendLatencyFaultEnable) {

try {

//获取到下一个messageQueue

int index = tpInfo.getSendWhichQueue().getAndIncrement();

for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {

int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

if (pos < 0)

pos = 0;

MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

//判断是否可用,从一个table中获取到,检测是否可用

if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))

return mq;

}

//选出一个不那么好的,但是基本可用的,如果没有返回,就是个null

final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

if (writeQueueNums > 0) {

final MessageQueue mq = tpInfo.selectOneMessageQueue();

if (notBestBroker != null) {

mq.setBrokerName(notBestBroker);

mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);

}

return mq;

} else {

latencyFaultTolerance.remove(notBestBroker);

}

} catch (Exception e) {

log.error(“Error occurred when selecting message queue”, e);

}

return tpInfo.selectOneMessageQueue();

}

获取是否开启故障延迟

然后获取到一个消息队列,然后检测这个消息队列是否可用,如果返回的消息队列可用,就移除对应的topic条目,说明可用

那么这个isAvailable的状态位是从哪里来的呢?

就利用了对应的update函数,位于LatencyFaultToleranceImpl中

//更新brokerName对应的状态

@Override

public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {

FaultItem old = this.faultItemTable.get(name);

if (null == old) {

final FaultItem faultItem = new FaultItem(name);

faultItem.setCurrentLatency(currentLatency);

//设置不可用的时间

faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

old = this.faultItemTable.putIfAbsent(name, faultItem);

if (old != null) {

old.setCurrentLatency(currentLatency);

old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

}

} else {

old.setCurrentLatency(currentLatency);

old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

}

}

检测的时候,就检测这个StartTimestamp

那么什么地方进行的更新呢?就是在DefaultMQProducer中进行更新

如果在发送的过程中,出现了异常

图片

或者是每次发送完成了

但是每次发送完成了,调用函数传入的是flase

这个参数如果是true,会使用30s作为computeNotAvailableDuration方法的参数

如果是flase,那么会使用时延作为参数

这个时间就是接下来不接受评比的时间

这里需要注意的是

currentLatency startTimeStamp是被volatile修饰的

startTimeStamp是当前系统时间加上需要规避的时长,startTimeStamp是判断broker当前可用的直接依据

说完了如何选择消息队列之后

我们来说一下消息的发送

发表评论

邮箱地址不会被公开。 必填项已用*标注