• RocketMQ的生产者在使用过程中,只要配置一个接入的地址,就可以使用整个的集群

RokcetMQ内部进行了确定访问的主题名称和队列序号,找到对应的Broker地址

那么在这其中,NameServer起到了重要的作用

NameServer作为一个通用的概念,在多数的分布式集群中都有出现,无论是Dubbo还是Flink等系统

我们来看下NameServer的设计思想

NameServer主要的作用是为客户端提供寻址的服务,协助客户端找到对应的Broker

NameServer还负责监控存活状态

NameServer支持集群或者多个节点组成一个集群,不过集群情况下,也需要保证每个NameServer都保存了所有的节点信息,可以独立提供服务

图片

整体的架构如上

详细的说,就是每个Broker都需要和所有的NameServer联系,当Broker的Topic信息发生变化的时候,会通知所有的NameServer更新路由信息,为了保证数据的一致性,还会定期的推送数据,这个推送还作为心跳请求来确定健康的状态

正因为每个NameServer都保存了所有的节点信息,所以对于客户端,可以选择任何一个NameServer来获取路由信息,在生产或者消费信息之前,先去NameServer上获取路由的主题信息

实际的源代码如下

在整个RocketMQ源码中,NameServer占了一个单元

图片

在其中,我们的入口基本可以从

DefaultRequestProcessor中入手,来查看注册流程.其中也负责处理Request的请求

我们直接看其中的代码

继承了NettyRequestProcessor

switch (request.getCode()) {

case RequestCode.PUT_KV_CONFIG:

return this.putKVConfig(ctx, request);

case RequestCode.GET_KV_CONFIG:

return this.getKVConfig(ctx, request);

case RequestCode.DELETE_KV_CONFIG:

return this.deleteKVConfig(ctx, request);

case RequestCode.QUERY_DATA_VERSION:

return queryBrokerTopicConfig(ctx, request);

case RequestCode.REGISTER_BROKER:

Version brokerVersion = MQVersion.value2Version(request.getVersion());

if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

return this.registerBrokerWithFilterServer(ctx, request);

} else {

return this.registerBroker(ctx, request);

}

case RequestCode.UNREGISTER_BROKER:

return this.unregisterBroker(ctx, request);

case RequestCode.GET_ROUTEINFO_BY_TOPIC:

return this.getRouteInfoByTopic(ctx, request);

case RequestCode.GET_BROKER_CLUSTER_INFO:

return this.getBrokerClusterInfo(ctx, request);

case RequestCode.WIPE_WRITE_PERM_OF_BROKER:

return this.wipeWritePermOfBroker(ctx, request);

case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:

return getAllTopicListFromNameserver(ctx, request);

case RequestCode.DELETE_TOPIC_IN_NAMESRV:

return deleteTopicInNamesrv(ctx, request);

case RequestCode.GET_KVLIST_BY_NAMESPACE:

return this.getKVListByNamespace(ctx, request);

case RequestCode.GET_TOPICS_BY_CLUSTER:

return this.getTopicsByCluster(ctx, request);

case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:

return this.getSystemTopicListFromNs(ctx, request);

case RequestCode.GET_UNIT_TOPIC_LIST:

return this.getUnitTopicList(ctx, request);

case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:

return this.getHasUnitSubTopicList(ctx, request);

case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:

return this.getHasUnitSubUnUnitTopicList(ctx, request);

case RequestCode.UPDATE_NAMESRV_CONFIG:

return this.updateConfig(ctx, request);

case RequestCode.GET_NAMESRV_CONFIG:

return this.getConfig(ctx, request);

default:

break;

}

return null;

}

在processRequest方法中,我们利用swtich方法来进行处理

在检查到RequestCode为REGISTER_BROKER的时候,进行调用处理

case RequestCode.REGISTER_BROKER:

Version brokerVersion = MQVersion.value2Version(request.getVersion());

if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

return this.registerBrokerWithFilterServer(ctx, request);

} else {

return this.registerBroker(ctx, request);

}

由于版本号的不同,处理方式是不一样的

但是殊途同归,最终都是调用了

RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(

requestHeader.getClusterName(),

requestHeader.getBrokerAddr(),

requestHeader.getBrokerName(),

requestHeader.getBrokerId(),

requestHeader.getHaServerAddr(),

topicConfigWrapper,

null,

ctx.channel()

);

来进行注册我们看一下内部的实现

首先是RoteInfoManager这个类,内部维护了五个Map,分别是

//主题和队列信息,队列信息的类为QueueData,内部保存了brokerName,用于在下面的brokerAddrTable查找信息

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

//保存了BrokerData

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

topicQueueTable保存了主题和队列之间的信息,队列中保存了brokerName

brokerAddrTable保存了brokerName对应的Broker信息,真实的Addrs

然后三个map分别是

brokerLiveTable,保存了每个Broker当前的动态信息

clusterAddrTable,集群和BrokerName的对应关心

filterServerTable,每个Broker对应的消息过滤服务地址

在调用的registerBroker方法中

public RegisterBrokerResult registerBroker(

final String clusterName,

final String brokerAddr,

final String brokerName,

final long brokerId,

final String haServerAddr,

final TopicConfigSerializeWrapper topicConfigWrapper,

final List<String> filterServerList,

final Channel channel) {

RegisterBrokerResult result = new RegisterBrokerResult();

try {

try {

this.lock.writeLock().lockInterruptibly();

Set<String> brokerNames = this.clusterAddrTable.get(clusterName);

if (null == brokerNames) {

brokerNames = new HashSet<String>();

this.clusterAddrTable.put(clusterName, brokerNames);

}

brokerNames.add(brokerName);

boolean registerFirst = false;

BrokerData brokerData = this.brokerAddrTable.get(brokerName);

if (null == brokerData) {

registerFirst = true;

brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());

this.brokerAddrTable.put(brokerName, brokerData);

}

Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>

//The same IP:PORT must only have one record in brokerAddrTable

Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();

while (it.hasNext()) {

Entry<Long, String> item = it.next();

if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {

it.remove();

}

}

String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);

registerFirst = registerFirst || (null == oldAddr);

if (null != topicConfigWrapper

&& MixAll.MASTER_ID == brokerId) {

if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())

|| registerFirst) {

ConcurrentMap<String, TopicConfig> tcTable =

topicConfigWrapper.getTopicConfigTable();

if (tcTable != null) {

for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {

this.createAndUpdateQueueData(brokerName, entry.getValue());

}

}

}

}

BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,

new BrokerLiveInfo(

System.currentTimeMillis(),

topicConfigWrapper.getDataVersion(),

channel,

haServerAddr));

if (null == prevBrokerLiveInfo) {

log.info(“new broker registered, {} HAServer: {}”, brokerAddr, haServerAddr);

}

if (filterServerList != null) {

if (filterServerList.isEmpty()) {

this.filterServerTable.remove(brokerAddr);

} else {

this.filterServerTable.put(brokerAddr, filterServerList);

}

}

if (MixAll.MASTER_ID != brokerId) {

String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);

if (masterAddr != null) {

BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);

if (brokerLiveInfo != null) {

result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());

result.setMasterAddr(masterAddr);

}

}

}

} finally {

this.lock.writeLock().unlock();

}

} catch (Exception e) {

log.error(“registerBroker Exception”, e);

}

return result;

}

首先加上了一个写锁,避免更新冲突,模拟事务

然后依次的更新5个map中的数据

然后是客户端如何从其中获取到Broker的信息

客户端的连接方式有两种,一种是直接连接到NameService服务,获取到路由信息

一种是连接到集群中的Broker,进行转发查询

我们先说下,直接连接到NameService中的方法,这种方法中,客户端会定期获取Broker信息进行缓存

整体还是从RequestProcessor中入手

case RequestCode.GET_ROUTEINFO_BY_TOPIC:

return this.getRouteInfoByTopic(ctx, request);

这种调用的是RouteInfoManager中的

public TopicRouteData pickupTopicRouteData(final String topic) {

TopicRouteData topicRouteData = new TopicRouteData();

boolean foundQueueData = false;

boolean foundBrokerData = false;

Set<String> brokerNameSet = new HashSet<String>();

List<BrokerData> brokerDataList = new LinkedList<BrokerData>();

topicRouteData.setBrokerDatas(brokerDataList);

HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();

topicRouteData.setFilterServerTable(filterServerMap);

try {

try {

this.lock.readLock().lockInterruptibly();

List<QueueData> queueDataList = this.topicQueueTable.get(topic);

if (queueDataList != null) {

topicRouteData.setQueueDatas(queueDataList);

foundQueueData = true;

Iterator<QueueData> it = queueDataList.iterator();

while (it.hasNext()) {

QueueData qd = it.next();

brokerNameSet.add(qd.getBrokerName());

}

for (String brokerName : brokerNameSet) {

BrokerData brokerData = this.brokerAddrTable.get(brokerName);

if (null != brokerData) {

BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData

.getBrokerAddrs().clone());

brokerDataList.add(brokerDataClone);

foundBrokerData = true;

for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {

List<String> filterServerList = this.filterServerTable.get(brokerAddr);

filterServerMap.put(brokerAddr, filterServerList);

}

}

}

}

} finally {

this.lock.readLock().unlock();

}

} catch (Exception e) {

log.error(“pickupTopicRouteData Exception”, e);

}

log.debug(“pickupTopicRouteData {} {}”, topic, topicRouteData);

if (foundBrokerData && foundQueueData) {

return topicRouteData;

}

return null;

}

首先是获取了一个读锁,避免出现并发冲突

然后从topicQueueTable获取到topic对应的Broker

只要不为空,就尝试获取到BrokerName,然后拿到BrokerData

放入返回的结果中

最终释放读锁

我们在本节中说了RocketMQ的源码中的NameServer部分,用于帮助客户端找到对应的Broker

那么,我们想一下,NameServer中,每个节点之间不需要互通,有啥好处和坏处

1.越简单的设计,健壮性越好,越单一的功能,性能越好,这样设计必然提供了更高的性能

2,但是容易出现无法保证一致性的问题,但是我想,可能RMQ也不在乎

发表评论

邮箱地址不会被公开。 必填项已用*标注