本章,我们来讲NameServer的路由注册
因为作为一个注册中心,需要提供Topic相关的路由信息
NameServer需要存储路由的基础信息,还能管理Broker节点信息
首先是关于NameServer的数据结构,通过这些数据结构,我们可以查看NameServer存储的相关信息
//存储Topic路由信息,根据Topic查找Broker名称等信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //Broker信息,根据broker名称,查找对应的Borker详情 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //Broker集群信息,内部存储了所有的Broker名称 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //状态信息,心跳时候更新 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //类模式过滤,暂时不了解 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; |
从上面的队列可以看出,Topic对应多个消息队列
一个Broker为一个主题创建4个读4个写,brokerId为0代表Master
对应的类图如下
说完了数据结构,我们来了解一下对应的路由注册
路由注册是利用心跳做到的,
Broker在启动的时候向集群中所有的NameServer发送心跳请求,然后每隔30秒发送心跳包,NameServer收到心跳包的时候更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimeStamp
如果120s没有收到心跳包,就会移除这个Broker的路由信息并且关闭Socket
关于Broker端的心跳发送代码,基本如下
在BrokerController类中的start函数中
有对应的启动定时线程池执行发送注册心跳的代码
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error(“registerBrokerAll Exception”, e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); |
对应到内部操作代码,基本如下
首先是registerBrokerAll,内部查询所有的默认config的数据
组装成为包装类调用
doRegisterBrokerAll
最终调用到
BrokerOuterAPI的registerBrokerAll,其内部主要流程为获取所有的NameServer列表,依次向NameServer发送心跳包
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr);//地址 requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr);//master地址 requestHeader.setCompressed(compressed);//是否开启压缩 //设置请求体 RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);//主体配置,Manager中的topicConfigTable,内部的默认Topic requestBody.setFilterServerList(filterServerList);//消息过滤服务器 //Netty中还会设置RequestCode final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); //这版本上了countDownLatch进行相关的发送处理 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info(“register broker[{}]to name server {} OK”, brokerId, namesrvAddr); } catch (Exception e) { log.warn(“registerBroker Exception, {}”, namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; } |
在每个启动中都有RequestCode,在对应的Netty中进行相对应的处理,具体的最终都可以使用RequestCode进行查找
接下来,对应的NameServer处理则在处理器中DefaultRequestProcessor中进行处理
case RequestCode.REGISTER_BROKER:
//注册请求
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
然后我们看对应的请求栈
在对应的registerBorker接口中
我们实际使用namesrvController中的registerBroker接口进行调用
public RegisterBrokerResult registerBroker(
final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { //首先加可重入锁 this.lock.writeLock().lockInterruptibly(); //获取broker集群信息 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; //维护BrokerData信息 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); //如果是第一次注册,或者老地址不存在 registerFirst = registerFirst || (null == oldAddr); //维护topic元信息,存储默认topic if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { //利用对应的topic信息,进行注册 ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } //更新存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info(“new broker registered, {} HAServer: {}”, brokerAddr, haServerAddr); } //更新过滤器 if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error(“registerBroker Exception”, e); } return result; } |
整体流程涉及一个锁机制,避免了数据更新不一致的情况