- RocketMQ的生产者在使用过程中,只要配置一个接入的地址,就可以使用整个的集群
RokcetMQ内部进行了确定访问的主题名称和队列序号,找到对应的Broker地址
那么在这其中,NameServer起到了重要的作用
NameServer作为一个通用的概念,在多数的分布式集群中都有出现,无论是Dubbo还是Flink等系统
我们来看下NameServer的设计思想
NameServer主要的作用是为客户端提供寻址的服务,协助客户端找到对应的Broker
NameServer还负责监控存活状态
NameServer支持集群或者多个节点组成一个集群,不过集群情况下,也需要保证每个NameServer都保存了所有的节点信息,可以独立提供服务
整体的架构如上
详细的说,就是每个Broker都需要和所有的NameServer联系,当Broker的Topic信息发生变化的时候,会通知所有的NameServer更新路由信息,为了保证数据的一致性,还会定期的推送数据,这个推送还作为心跳请求来确定健康的状态
正因为每个NameServer都保存了所有的节点信息,所以对于客户端,可以选择任何一个NameServer来获取路由信息,在生产或者消费信息之前,先去NameServer上获取路由的主题信息
实际的源代码如下
在整个RocketMQ源码中,NameServer占了一个单元
在其中,我们的入口基本可以从
DefaultRequestProcessor中入手,来查看注册流程.其中也负责处理Request的请求
我们直接看其中的代码
继承了NettyRequestProcessor
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
在processRequest方法中,我们利用swtich方法来进行处理
在检查到RequestCode为REGISTER_BROKER的时候,进行调用处理
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
由于版本号的不同,处理方式是不一样的
但是殊途同归,最终都是调用了
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
来进行注册我们看一下内部的实现
首先是RoteInfoManager这个类,内部维护了五个Map,分别是
//主题和队列信息,队列信息的类为QueueData,内部保存了brokerName,用于在下面的brokerAddrTable查找信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//保存了BrokerData
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
topicQueueTable保存了主题和队列之间的信息,队列中保存了brokerName
brokerAddrTable保存了brokerName对应的Broker信息,真实的Addrs
然后三个map分别是
brokerLiveTable,保存了每个Broker当前的动态信息
clusterAddrTable,集群和BrokerName的对应关心
filterServerTable,每个Broker对应的消息过滤服务地址
在调用的registerBroker方法中
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info(“new broker registered, {} HAServer: {}”, brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error(“registerBroker Exception”, e);
}
return result;
}
首先加上了一个写锁,避免更新冲突,模拟事务
然后依次的更新5个map中的数据
然后是客户端如何从其中获取到Broker的信息
客户端的连接方式有两种,一种是直接连接到NameService服务,获取到路由信息
一种是连接到集群中的Broker,进行转发查询
我们先说下,直接连接到NameService中的方法,这种方法中,客户端会定期获取Broker信息进行缓存
整体还是从RequestProcessor中入手
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
这种调用的是RouteInfoManager中的
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
this.lock.readLock().lockInterruptibly();
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error(“pickupTopicRouteData Exception”, e);
}
log.debug(“pickupTopicRouteData {} {}”, topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
首先是获取了一个读锁,避免出现并发冲突
然后从topicQueueTable获取到topic对应的Broker
只要不为空,就尝试获取到BrokerName,然后拿到BrokerData
放入返回的结果中
最终释放读锁
我们在本节中说了RocketMQ的源码中的NameServer部分,用于帮助客户端找到对应的Broker
那么,我们想一下,NameServer中,每个节点之间不需要互通,有啥好处和坏处
1.越简单的设计,健壮性越好,越单一的功能,性能越好,这样设计必然提供了更高的性能
2,但是容易出现无法保证一致性的问题,但是我想,可能RMQ也不在乎