最后说一下NameServer路由信息的删除
之前说了Broker每隔30s发送一个心跳包,心跳包含有BrokerId,Broker地址,名称,集群名称,Broker的FilterServer列表
然后NameServer内部10s扫描brokerLiveTable状态表,如果时间戳超过120s无更新,就认为Broker失败,移除Broker的相关连接
更新topicQueueTable brokerAddrTable brokerLiveTable filterServerTbale
触发点有两个
(1)NameServer定时扫描brokerLiveTable检测上次心跳包和当前系统时间的时间差,大于120s,则会移除这个Broker
(2)Borker正常关闭的时候,发来的unregisterBroker的指令
本质上都是在RouteManager中移除对应的信息
故,我们以第一种的流程为例,走一个全流程
首先是RouteInfoManager的scanNotActiveBroker
这个函数必然是在NameSrvController中有对应的定时任务进行相关的调用
public void scanNotActiveBroker() {
//此函数无锁,毕竟是单例的调用 Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); //如果小于规定的时间 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { //netty相关,移除 RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn(“The broker channel expired, {} {}ms”, next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); //移除对应的结构体的代码在此 this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } } |
在onChannelDestroy中
第一步,移除心跳对应结构体,获取对应的brokerAddr
String brokerAddrFound = null;
if (channel != null) { try { try { //请求锁,防止更新心跳 this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); //请求查找对应的brokerAddrFound if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error(“onChannelDestroy Exception”, e); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info(“the broker’s channel destroyed, {}, clean it’s data structure at once”, brokerAddrFound); } |
2.利用address,来从移除brokerAddrTable中相关结构体
try {
//申请读锁,将brokerAddress从brokerLiveTable,filterServerTable移除 this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false; //维护对应的brokerAddrTable Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); //遍历 while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); //直接拿到value Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { //value存储的是一个name下的addrs Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { //找到了 brokerNameFound = brokerData.getBrokerName(); //移除 it.remove(); log.info(“remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed”, brokerId, brokerAddr); break; } } if (brokerData.getBrokerAddrs().isEmpty()) { //删除完成发现如果全为空,删除这个brokerName对应条目 removeBrokerName = true; itBrokerAddrTable.remove(); log.info(“remove brokerName[{}] from brokerAddrTable, because channel destroyed”, brokerData.getBrokerName()); } } |
3.修改clusterAddrTable相关数据结构没如果没有的时候,进行移除
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info(“remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed”, brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info(“remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster”, clusterName); it.remove(); } break; } } } |
4.移除对应的topicQueue信息
//根据上一步BorkerAddrTable的信息,判断是否移除,topicQueueTable
if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List<QueueData> queueDataList = entry.getValue(); Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info(“remove topic[{} {}], from topicQueueTable, because channel destroyed”, topic, queueData); } } //如果为空 if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info(“remove topic[{}] all queue, from topicQueueTable, because channel destroyed”, topic); } } } |
最后说一个路由发现的机制
RocketMQ路由发现是非实时的,路由变化之后,NameServer自我保存即可,等待客户端定期的拉取最新的路由信息
对应流程如下
DefaultProcessor
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
//根据主题名拉取路由信息的命令如下
return this.getRouteInfoByTopic(ctx, request);
在其中调用对应的函数
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); //获取对应的topicRouteData TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { //根据config判断是否是顺序消息 if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = //获取对应配置信息进行路由信息填充 this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //没找到的情况 response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark(“No topic route info in name server for the topic: ” + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } |
内部pickupTopicRouteData函数获取topic
然后进行填充
但是别忘了因为borker的心跳机制,导致