本章,我们来讲NameServer的路由注册

因为作为一个注册中心,需要提供Topic相关的路由信息

NameServer需要存储路由的基础信息,还能管理Broker节点信息

首先是关于NameServer的数据结构,通过这些数据结构,我们可以查看NameServer存储的相关信息

//存储Topic路由信息,根据Topic查找Broker名称等信息

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

//Broker信息,根据broker名称,查找对应的Borker详情

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

//Broker集群信息,内部存储了所有的Broker名称

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

//状态信息,心跳时候更新

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

//类模式过滤,暂时不了解

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

从上面的队列可以看出,Topic对应多个消息队列

一个Broker为一个主题创建4个读4个写,brokerId为0代表Master

对应的类图如下

图片

图片

图片

说完了数据结构,我们来了解一下对应的路由注册

路由注册是利用心跳做到的,

Broker在启动的时候向集群中所有的NameServer发送心跳请求,然后每隔30秒发送心跳包,NameServer收到心跳包的时候更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimeStamp

如果120s没有收到心跳包,就会移除这个Broker的路由信息并且关闭Socket

关于Broker端的心跳发送代码,基本如下

在BrokerController类中的start函数中

有对应的启动定时线程池执行发送注册心跳的代码

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

} catch (Throwable e) {

log.error(“registerBrokerAll Exception”, e);

}

}

}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

对应到内部操作代码,基本如下

首先是registerBrokerAll,内部查询所有的默认config的数据

组装成为包装类调用

doRegisterBrokerAll

最终调用到

BrokerOuterAPI的registerBrokerAll,其内部主要流程为获取所有的NameServer列表,依次向NameServer发送心跳包

public List<RegisterBrokerResult> registerBrokerAll(

final String clusterName,

final String brokerAddr,

final String brokerName,

final long brokerId,

final String haServerAddr,

final TopicConfigSerializeWrapper topicConfigWrapper,

final List<String> filterServerList,

final boolean oneway,

final int timeoutMills,

final boolean compressed) {

final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();

List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();

requestHeader.setBrokerAddr(brokerAddr);//地址

requestHeader.setBrokerId(brokerId);

requestHeader.setBrokerName(brokerName);

requestHeader.setClusterName(clusterName);

requestHeader.setHaServerAddr(haServerAddr);//master地址

requestHeader.setCompressed(compressed);//是否开启压缩

//设置请求体

RegisterBrokerBody requestBody = new RegisterBrokerBody();

requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);//主体配置,Manager中的topicConfigTable,内部的默认Topic

requestBody.setFilterServerList(filterServerList);//消息过滤服务器

//Netty中还会设置RequestCode

final byte[] body = requestBody.encode(compressed);

final int bodyCrc32 = UtilAll.crc32(body);

requestHeader.setBodyCrc32(bodyCrc32);

//这版本上了countDownLatch进行相关的发送处理

final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());

for (final String namesrvAddr : nameServerAddressList) {

brokerOuterExecutor.execute(new Runnable() {

@Override

public void run() {

try {

RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);

if (result != null) {

registerBrokerResultList.add(result);

}

log.info(“register broker[{}]to name server {} OK”, brokerId, namesrvAddr);

} catch (Exception e) {

log.warn(“registerBroker Exception, {}”, namesrvAddr, e);

} finally {

countDownLatch.countDown();

}

}

});

}

try {

countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);

} catch (InterruptedException e) {

}

}

return registerBrokerResultList;

}

在每个启动中都有RequestCode,在对应的Netty中进行相对应的处理,具体的最终都可以使用RequestCode进行查找

接下来,对应的NameServer处理则在处理器中DefaultRequestProcessor中进行处理

case RequestCode.REGISTER_BROKER:

//注册请求

Version brokerVersion = MQVersion.value2Version(request.getVersion());

if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

return this.registerBrokerWithFilterServer(ctx, request);

} else {

return this.registerBroker(ctx, request);

}

然后我们看对应的请求栈

在对应的registerBorker接口中

我们实际使用namesrvController中的registerBroker接口进行调用

public RegisterBrokerResult registerBroker(

final String clusterName,

final String brokerAddr,

final String brokerName,

final long brokerId,

final String haServerAddr,

final TopicConfigSerializeWrapper topicConfigWrapper,

final List<String> filterServerList,

final Channel channel) {

RegisterBrokerResult result = new RegisterBrokerResult();

try {

try {

//首先加可重入锁

this.lock.writeLock().lockInterruptibly();

//获取broker集群信息

Set<String> brokerNames = this.clusterAddrTable.get(clusterName);

if (null == brokerNames) {

brokerNames = new HashSet<String>();

this.clusterAddrTable.put(clusterName, brokerNames);

}

brokerNames.add(brokerName);

boolean registerFirst = false;

//维护BrokerData信息

BrokerData brokerData = this.brokerAddrTable.get(brokerName);

if (null == brokerData) {

registerFirst = true;

brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());

this.brokerAddrTable.put(brokerName, brokerData);

}

Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>

//The same IP:PORT must only have one record in brokerAddrTable

Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();

while (it.hasNext()) {

Entry<Long, String> item = it.next();

if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {

it.remove();

}

}

String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);

//如果是第一次注册,或者老地址不存在

registerFirst = registerFirst || (null == oldAddr);

//维护topic元信息,存储默认topic

if (null != topicConfigWrapper

&& MixAll.MASTER_ID == brokerId) {

if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())

|| registerFirst) {

//利用对应的topic信息,进行注册

ConcurrentMap<String, TopicConfig> tcTable =

topicConfigWrapper.getTopicConfigTable();

if (tcTable != null) {

for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {

this.createAndUpdateQueueData(brokerName, entry.getValue());

}

}

}

}

//更新存活信息

BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,

new BrokerLiveInfo(

System.currentTimeMillis(),

topicConfigWrapper.getDataVersion(),

channel,

haServerAddr));

if (null == prevBrokerLiveInfo) {

log.info(“new broker registered, {} HAServer: {}”, brokerAddr, haServerAddr);

}

//更新过滤器

if (filterServerList != null) {

if (filterServerList.isEmpty()) {

this.filterServerTable.remove(brokerAddr);

} else {

this.filterServerTable.put(brokerAddr, filterServerList);

}

}

if (MixAll.MASTER_ID != brokerId) {

String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);

if (masterAddr != null) {

BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);

if (brokerLiveInfo != null) {

result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());

result.setMasterAddr(masterAddr);

}

}

}

} finally {

this.lock.writeLock().unlock();

}

} catch (Exception e) {

log.error(“registerBroker Exception”, e);

}

return result;

}

整体流程涉及一个锁机制,避免了数据更新不一致的情况

发表评论

邮箱地址不会被公开。 必填项已用*标注