RMQ支持两种过滤模式,分别是类过滤模式和表达式模式,表达式分为了TAG和SQL92模式,类模式则是提价的时候提交一个过滤类到FilterService
TAG模式就是简单的定义消息标签,根据消息属性tag进行匹配,SQL模式则是实现类SQL模式的过滤
消息过滤表达式API简单如下
//根据ConsumerQueue判断消息是否匹配
boolean isMatchedByConsumeQueue(final Long tagsCode, final ConsumeQueueExt.CqExtUnit cqExtUnit); //根据commitLog文件的内容判断消息是否匹配 boolean isMatchedByCommitLog(final ByteBuffer msgBuffer, final Map<String, String> properties); |
我们先说的是表达式版本的消息过滤机制
首先说ComsumeQueue的存储版本基本由 CommitLogOffset Size MessageTagHashcode三个部分组成
对于tags,ConsumeQueue中只存储hashCode,而且在Broker端,也只匹配tagsHashCode,只有在Consumer中,才会对比较tags的值,也就是在Consumer中才会实际的进行过滤
从这个角度,我们可以直接看Consumer端的代码,是如何实现过滤的
public void subscribe(String topic, String subExpression) throws MQClientException {
try { //构建订阅信息 subscription SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression); //加入RebalanceImpl中,订阅以便队列负载 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch (Exception e) { throw new MQClientException(“subscription exception”, e); } } |
构成了SubscriptionData,而SubscriptionData内部属性包含
//过滤模式
public final static String SUB_ALL = “*”; //是否是类过滤 private boolean classFilterMode = false; //消息主题 private String topic; //消息过滤表达式 private String subString; //tag集合 private Set<String> tagsSet = new HashSet<String>(); //hashcode集合 private Set<Integer> codeSet = new HashSet<Integer>(); //创建时间 private long subVersion = System.currentTimeMillis(); //过滤类型 private String expressionType = ExpressionType.TAG; |
然后再拉取消息的时候,需要构建消息拉取标记
在此之中,我们需要设置subExpression classFilter
String subExpression = null;
boolean classFilter = false; //获取一开始订阅的SubscriptionData SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { //如果设置了要求拉取和订阅信息不是类模式 if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { //消息过滤表达式的获取 subExpression = sd.getSubString(); } //是不是类模式 classFilter = sd.isClassFilterMode(); } //拼接sysFlag ,根据不同的条件是否满足,来进行拼接为一个int int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); |
之后发给对应的Broker
在Broker端,在processRequest中,会解析订阅信息,构建SubscriptionData,代码如下
//获取订阅信息
SubscriptionData subscriptionData = null; ConsumerFilterData consumerFilterData = null; //解析获得 if (hasSubscriptionFlag) { try { //构建subsciptionData subscriptionData = FilterAPI.build( requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType() ); //如果不是Tag类型 if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { //构建consumerFilterData consumerFilterData = ConsumerFilterManager.build( requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion() ); assert consumerFilterData != null; } |
然后根据消息主题,构建消息实体,如果不是TAG模式,还会构建ConsumerFilterData
做完上面说的问题之后,根据不同的Data构建实际的过滤器
//根据上面的订阅信息和订阅过滤信息,构建实际的过滤器
MessageFilter messageFilter; //根据是不是支持重试,进行不同的过滤 if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager()); } else { messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager()); } |
将这个Filter传入MessageStore进行查询
在MessageStore中,首先是从ConsumeQueue中获取到消息
遍历ConsumeQueue的时候,判断tagsCode是否是通过Filter
//这一步是检查是否符合过滤条件
if (messageFilter != null //不符合条件的话 && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } //跳过 continue; } |
然后是遍历CommitLog的时候,判断CommitLog中的是否符合
//这一步检查CommitLog中的存储是否符合过滤
if (messageFilter != null //这一步,tag模式下,isMatchedByCommitLog直接返回true,交由消费者端进行精确的匹配 && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release… selectResult.release(); continue; } |
在isMatchedByCommitLog过滤中,主要针对的是SQL92模式下的过滤,对于TAG模式和类模式,都是直接返回true的
对于消费者端,则是在PullAPIWrapper#processPuLlRequest中进行了处理
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } |
在获取到服务器端给的消息之后,进行解析,对tag进行判断,集合中包含消息的TAG则返回消费,不然就跳过