从服务器如何在消息拉取的时候参与负载,这就是一个主要的问题,而对于消息的拉取,则是基于的MessageQueue这个数据结构,MessageQueue的数据类图如下

MessageQueue
private String topic

private String brokerName

private int queueId

对于读写分离的核心流程,在于根据broker查找到实际的brokerAddr的函数中

RMQ根据MessageQueue查找Broker地址的唯一依据是brokerName

MQ的brokerName一致,但是brokerId不同,主服务器brokerId为0,从brokerId大于0,RMQ提供对应的函数来根据brokerName找到对应的brokerAddr

一个brokerName对应着一组broker,他们的brokerId不同,主broker的id为0,从brokerId不相同

上面就是根据brokerName对应找到brokerId 从而获取brokerAddr的全过程

代码如下

FindBrokerResult findBrokerResult =

this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),

this.recalculatePullFromWhichNode(mq), false);

其中我们获取到

findBrokerAddressInSubscribe

函数中,传入的参数显而易见

brokerName,brokerId,onlyThisBroker->限定是否返回此broker对应的服务器信息

public FindBrokerResult findBrokerAddressInSubscribe(

final String brokerName,

final long brokerId,

final boolean onlyThisBroker

) {

String brokerAddr = null;

boolean slave = false;

boolean found = false;

HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);

//根据节点民称获取到所有的节点Id

if (map != null && !map.isEmpty()) {

//尝试获取到Id对应的Addr

brokerAddr = map.get(brokerId);

slave = brokerId != MixAll.MASTER_ID;

found = brokerAddr != null;

//如果没找到并且是Id是从节点

if (!found && slave) {

//从新获取

brokerAddr = map.get(brokerId + 1);

found = brokerAddr != null;

}

if (!found && !onlyThisBroker) {

//获取下一个brokerName中的addr

Entry<Long, String> entry = map.entrySet().iterator().next();

brokerAddr = entry.getValue();

slave = entry.getKey() != MixAll.MASTER_ID;

found = true;

}

}

//返回数据结构,包含Broker地址,是否是从节点 Broker版本

if (found) {

return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));

}

return null;

}

最终根据是否知道了地址,来组成返回数据结构,其中包含地址,名称等属性

在传入参数的时候,尝试根据brokerName获取到brokerId的函数为

recalculatePullFromWhichNode(),其函数实现为

public long recalculatePullFromWhichNode(final MessageQueue mq) {

//默认返回主节点

if (this.isConnectBrokerByUser()) {

return this.defaultBrokerId;

}

//从此缓存表获取

AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);

if (suggest != null) {

return suggest.get();

}

return MixAll.MASTER_ID;

}

对于此缓存表中的数据,从何而来呢?

这一缓存表的数据来源自PullMessageService在从主服务器拉取消息之后,会计算建议下一次拉取的brokerId,从而更新这个PullMessageService

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,

final SubscriptionData subscriptionData) {

PullResultExt pullResultExt = (PullResultExt) pullResult;

this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

上面就是根据建议的brokerId更新缓存表

而更新的规则则在Broker上

在主Broker上返回建议BrokerId的地方为

//normal流程中

//最大偏移量减去此次拉取的最大偏移量

long diff = maxOffsetPy – maxPhyOffsetPulling;

//获取本机最大内存乘以能使用的最大比例,从而获取到一个可以使用内存总量

long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE

* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));

//看是不是接下来需要拉取的消息超过了常驻消息

getResult.setSuggestPullingFromSlave(diff > memory);

如果设置了需要去从Broker上拉取,那么就返回如下的BrokerId

if (getMessageResult.isSuggestPullingFromSlave()) {

responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());

}

上面返回了配置中建议资源不足时去拉取的BrokerId,默认为1,如果一个Master拥有多个服务器,参与拉取的从服务器只会是其中之一

关于本章,因为总体并不长,所以可以简单的总结一下

RMQ的HA机制,是从服务器在启动的时候往主服务器建立一个Channel,然后获取到commitLog的最大偏移量,利用本服务器的最大偏移量向着主服务器拉取消息

主服务器根据从服务器的偏移量和自身commitlog的最大偏移量进行比较,然后来返回一定数量的数据,直到主从同步完成

RMQ的读写分离,则是消费者先向主服务器发起一个消息拉去,然后主服务器返回一批数据,并且主服务器根据自身负载情况,返回下次拉取是从主服务器还是从服务器拉取

发表评论

邮箱地址不会被公开。 必填项已用*标注