从服务器如何在消息拉取的时候参与负载,这就是一个主要的问题,而对于消息的拉取,则是基于的MessageQueue这个数据结构,MessageQueue的数据类图如下
MessageQueue |
private String topic
private String brokerName private int queueId |
对于读写分离的核心流程,在于根据broker查找到实际的brokerAddr的函数中
RMQ根据MessageQueue查找Broker地址的唯一依据是brokerName
MQ的brokerName一致,但是brokerId不同,主服务器brokerId为0,从brokerId大于0,RMQ提供对应的函数来根据brokerName找到对应的brokerAddr
一个brokerName对应着一组broker,他们的brokerId不同,主broker的id为0,从brokerId不相同
上面就是根据brokerName对应找到brokerId 从而获取brokerAddr的全过程
代码如下
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); |
其中我们获取到
findBrokerAddressInSubscribe
函数中,传入的参数显而易见
brokerName,brokerId,onlyThisBroker->限定是否返回此broker对应的服务器信息
public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName, final long brokerId, final boolean onlyThisBroker ) { String brokerAddr = null; boolean slave = false; boolean found = false; HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); //根据节点民称获取到所有的节点Id if (map != null && !map.isEmpty()) { //尝试获取到Id对应的Addr brokerAddr = map.get(brokerId); slave = brokerId != MixAll.MASTER_ID; found = brokerAddr != null; //如果没找到并且是Id是从节点 if (!found && slave) { //从新获取 brokerAddr = map.get(brokerId + 1); found = brokerAddr != null; } if (!found && !onlyThisBroker) { //获取下一个brokerName中的addr Entry<Long, String> entry = map.entrySet().iterator().next(); brokerAddr = entry.getValue(); slave = entry.getKey() != MixAll.MASTER_ID; found = true; } } //返回数据结构,包含Broker地址,是否是从节点 Broker版本 if (found) { return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); } return null; } |
最终根据是否知道了地址,来组成返回数据结构,其中包含地址,名称等属性
在传入参数的时候,尝试根据brokerName获取到brokerId的函数为
recalculatePullFromWhichNode(),其函数实现为
public long recalculatePullFromWhichNode(final MessageQueue mq) {
//默认返回主节点 if (this.isConnectBrokerByUser()) { return this.defaultBrokerId; } //从此缓存表获取 AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (suggest != null) { return suggest.get(); } return MixAll.MASTER_ID; } |
对于此缓存表中的数据,从何而来呢?
这一缓存表的数据来源自PullMessageService在从主服务器拉取消息之后,会计算建议下一次拉取的brokerId,从而更新这个PullMessageService
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); |
上面就是根据建议的brokerId更新缓存表
而更新的规则则在Broker上
在主Broker上返回建议BrokerId的地方为
//normal流程中
//最大偏移量减去此次拉取的最大偏移量 long diff = maxOffsetPy – maxPhyOffsetPulling; //获取本机最大内存乘以能使用的最大比例,从而获取到一个可以使用内存总量 long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); //看是不是接下来需要拉取的消息超过了常驻消息 getResult.setSuggestPullingFromSlave(diff > memory); |
如果设置了需要去从Broker上拉取,那么就返回如下的BrokerId
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } |
上面返回了配置中建议资源不足时去拉取的BrokerId,默认为1,如果一个Master拥有多个服务器,参与拉取的从服务器只会是其中之一
关于本章,因为总体并不长,所以可以简单的总结一下
RMQ的HA机制,是从服务器在启动的时候往主服务器建立一个Channel,然后获取到commitLog的最大偏移量,利用本服务器的最大偏移量向着主服务器拉取消息
主服务器根据从服务器的偏移量和自身commitlog的最大偏移量进行比较,然后来返回一定数量的数据,直到主从同步完成
RMQ的读写分离,则是消费者先向主服务器发起一个消息拉去,然后主服务器返回一批数据,并且主服务器根据自身负载情况,返回下次拉取是从主服务器还是从服务器拉取