RMQ的消息过滤发生在消息消费的时候,PullMessageService默认从Broker拉取消息,执行相同的过滤逻辑,FilterServer过滤模式下,需要转换地址
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } |
在其中获取到地址
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
throws MQClientException { //获取到所有的TopicRoute ConcurrentMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable(); if (topicRouteTable != null) { //获取到TopicRouteData TopicRouteData topicRouteData = topicRouteTable.get(topic); //根据Addr获取到list List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr); if (list != null && !list.isEmpty()) { return list.get(randomNum() % list.size()); } } |