最后说一下NameServer路由信息的删除

之前说了Broker每隔30s发送一个心跳包,心跳包含有BrokerId,Broker地址,名称,集群名称,Broker的FilterServer列表

然后NameServer内部10s扫描brokerLiveTable状态表,如果时间戳超过120s无更新,就认为Broker失败,移除Broker的相关连接

更新topicQueueTable brokerAddrTable brokerLiveTable filterServerTbale

触发点有两个

(1)NameServer定时扫描brokerLiveTable检测上次心跳包和当前系统时间的时间差,大于120s,则会移除这个Broker

(2)Borker正常关闭的时候,发来的unregisterBroker的指令

本质上都是在RouteManager中移除对应的信息

故,我们以第一种的流程为例,走一个全流程

首先是RouteInfoManager的scanNotActiveBroker

这个函数必然是在NameSrvController中有对应的定时任务进行相关的调用

public void scanNotActiveBroker() {

//此函数无锁,毕竟是单例的调用

Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, BrokerLiveInfo> next = it.next();

long last = next.getValue().getLastUpdateTimestamp();

//如果小于规定的时间

if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {

//netty相关,移除

RemotingUtil.closeChannel(next.getValue().getChannel());

it.remove();

log.warn(“The broker channel expired, {} {}ms”, next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);

//移除对应的结构体的代码在此

this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

}

}

}

在onChannelDestroy中

第一步,移除心跳对应结构体,获取对应的brokerAddr

String brokerAddrFound = null;

if (channel != null) {

try {

try {

//请求锁,防止更新心跳

this.lock.readLock().lockInterruptibly();

Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =

this.brokerLiveTable.entrySet().iterator();

while (itBrokerLiveTable.hasNext()) {

Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();

//请求查找对应的brokerAddrFound

if (entry.getValue().getChannel() == channel) {

brokerAddrFound = entry.getKey();

break;

}

}

} finally {

this.lock.readLock().unlock();

}

} catch (Exception e) {

log.error(“onChannelDestroy Exception”, e);

}

}

if (null == brokerAddrFound) {

brokerAddrFound = remoteAddr;

} else {

log.info(“the broker’s channel destroyed, {}, clean it’s data structure at once”, brokerAddrFound);

}

2.利用address,来从移除brokerAddrTable中相关结构体

try {

//申请读锁,将brokerAddress从brokerLiveTable,filterServerTable移除

this.lock.writeLock().lockInterruptibly();

this.brokerLiveTable.remove(brokerAddrFound);

this.filterServerTable.remove(brokerAddrFound);

String brokerNameFound = null;

boolean removeBrokerName = false;

//维护对应的brokerAddrTable

Iterator<Entry<String, BrokerData>> itBrokerAddrTable =

this.brokerAddrTable.entrySet().iterator();

//遍历

while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {

BrokerData brokerData = itBrokerAddrTable.next().getValue();

//直接拿到value

Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();

while (it.hasNext()) {

//value存储的是一个name下的addrs

Entry<Long, String> entry = it.next();

Long brokerId = entry.getKey();

String brokerAddr = entry.getValue();

if (brokerAddr.equals(brokerAddrFound)) {

//找到了

brokerNameFound = brokerData.getBrokerName();

//移除

it.remove();

log.info(“remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed”,

brokerId, brokerAddr);

break;

}

}

if (brokerData.getBrokerAddrs().isEmpty()) {

//删除完成发现如果全为空,删除这个brokerName对应条目

removeBrokerName = true;

itBrokerAddrTable.remove();

log.info(“remove brokerName[{}] from brokerAddrTable, because channel destroyed”,

brokerData.getBrokerName());

}

}

3.修改clusterAddrTable相关数据结构没如果没有的时候,进行移除

if (brokerNameFound != null && removeBrokerName) {

Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, Set<String>> entry = it.next();

String clusterName = entry.getKey();

Set<String> brokerNames = entry.getValue();

boolean removed = brokerNames.remove(brokerNameFound);

if (removed) {

log.info(“remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed”,

brokerNameFound, clusterName);

if (brokerNames.isEmpty()) {

log.info(“remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster”,

clusterName);

it.remove();

}

break;

}

}

}

4.移除对应的topicQueue信息

//根据上一步BorkerAddrTable的信息,判断是否移除,topicQueueTable

if (removeBrokerName) {

Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =

this.topicQueueTable.entrySet().iterator();

while (itTopicQueueTable.hasNext()) {

Entry<String, List<QueueData>> entry = itTopicQueueTable.next();

String topic = entry.getKey();

List<QueueData> queueDataList = entry.getValue();

Iterator<QueueData> itQueueData = queueDataList.iterator();

while (itQueueData.hasNext()) {

QueueData queueData = itQueueData.next();

if (queueData.getBrokerName().equals(brokerNameFound)) {

itQueueData.remove();

log.info(“remove topic[{} {}], from topicQueueTable, because channel destroyed”,

topic, queueData);

}

}

//如果为空

if (queueDataList.isEmpty()) {

itTopicQueueTable.remove();

log.info(“remove topic[{}] all queue, from topicQueueTable, because channel destroyed”,

topic);

}

}

}

最后说一个路由发现的机制

RocketMQ路由发现是非实时的,路由变化之后,NameServer自我保存即可,等待客户端定期的拉取最新的路由信息

对应流程如下

DefaultProcessor

case RequestCode.GET_ROUTEINFO_BY_TOPIC:

//根据主题名拉取路由信息的命令如下

return this.getRouteInfoByTopic(ctx, request);

在其中调用对应的函数

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,

RemotingCommand request) throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

final GetRouteInfoRequestHeader requestHeader =

(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

//获取对应的topicRouteData

TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

if (topicRouteData != null) {

//根据config判断是否是顺序消息

if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {

String orderTopicConf =

//获取对应配置信息进行路由信息填充

this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,

requestHeader.getTopic());

topicRouteData.setOrderTopicConf(orderTopicConf);

}

byte[] content = topicRouteData.encode();

response.setBody(content);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

}

//没找到的情况

response.setCode(ResponseCode.TOPIC_NOT_EXIST);

response.setRemark(“No topic route info in name server for the topic: ” + requestHeader.getTopic()

+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));

return response;

}

内部pickupTopicRouteData函数获取topic

然后进行填充

但是别忘了因为borker的心跳机制,导致

发表评论

邮箱地址不会被公开。 必填项已用*标注