RMQ的消息过滤发生在消息消费的时候,PullMessageService默认从Broker拉取消息,执行相同的过滤逻辑,FilterServer过滤模式下,需要转换地址

if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {

brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);

}

在其中获取到地址

private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)

throws MQClientException {

//获取到所有的TopicRoute

ConcurrentMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();

if (topicRouteTable != null) {

//获取到TopicRouteData

TopicRouteData topicRouteData = topicRouteTable.get(topic);

//根据Addr获取到list

List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);

if (list != null && !list.isEmpty()) {

return list.get(randomNum() % list.size());

}

}

发表评论

邮箱地址不会被公开。 必填项已用*标注