RMQ支持两种过滤模式,分别是类过滤模式和表达式模式,表达式分为了TAG和SQL92模式,类模式则是提价的时候提交一个过滤类到FilterService

TAG模式就是简单的定义消息标签,根据消息属性tag进行匹配,SQL模式则是实现类SQL模式的过滤

消息过滤表达式API简单如下

//根据ConsumerQueue判断消息是否匹配

boolean isMatchedByConsumeQueue(final Long tagsCode,

final ConsumeQueueExt.CqExtUnit cqExtUnit);

//根据commitLog文件的内容判断消息是否匹配

boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,

final Map<String, String> properties);

我们先说的是表达式版本的消息过滤机制

首先说ComsumeQueue的存储版本基本由 CommitLogOffset Size MessageTagHashcode三个部分组成

对于tags,ConsumeQueue中只存储hashCode,而且在Broker端,也只匹配tagsHashCode,只有在Consumer中,才会对比较tags的值,也就是在Consumer中才会实际的进行过滤

从这个角度,我们可以直接看Consumer端的代码,是如何实现过滤的

public void subscribe(String topic, String subExpression) throws MQClientException {

try {

//构建订阅信息 subscription

SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),

topic, subExpression);

//加入RebalanceImpl中,订阅以便队列负载

this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

if (this.mQClientFactory != null) {

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

}

} catch (Exception e) {

throw new MQClientException(“subscription exception”, e);

}

}

构成了SubscriptionData,而SubscriptionData内部属性包含

//过滤模式

public final static String SUB_ALL = “*”;

//是否是类过滤

private boolean classFilterMode = false;

//消息主题

private String topic;

//消息过滤表达式

private String subString;

//tag集合

private Set<String> tagsSet = new HashSet<String>();

//hashcode集合

private Set<Integer> codeSet = new HashSet<Integer>();

//创建时间

private long subVersion = System.currentTimeMillis();

//过滤类型

private String expressionType = ExpressionType.TAG;

然后再拉取消息的时候,需要构建消息拉取标记

在此之中,我们需要设置subExpression classFilter

String subExpression = null;

boolean classFilter = false;

//获取一开始订阅的SubscriptionData

SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());

if (sd != null) {

//如果设置了要求拉取和订阅信息不是类模式

if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {

//消息过滤表达式的获取

subExpression = sd.getSubString();

}

//是不是类模式

classFilter = sd.isClassFilterMode();

}

//拼接sysFlag ,根据不同的条件是否满足,来进行拼接为一个int

int sysFlag = PullSysFlag.buildSysFlag(

commitOffsetEnable, // commitOffset

true, // suspend

subExpression != null, // subscription

classFilter // class filter

);

之后发给对应的Broker

在Broker端,在processRequest中,会解析订阅信息,构建SubscriptionData,代码如下

//获取订阅信息

SubscriptionData subscriptionData = null;

ConsumerFilterData consumerFilterData = null;

//解析获得

if (hasSubscriptionFlag) {

try {

//构建subsciptionData

subscriptionData = FilterAPI.build(

requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()

);

//如果不是Tag类型

if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {

//构建consumerFilterData

consumerFilterData = ConsumerFilterManager.build(

requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),

requestHeader.getExpressionType(), requestHeader.getSubVersion()

);

assert consumerFilterData != null;

}

然后根据消息主题,构建消息实体,如果不是TAG模式,还会构建ConsumerFilterData

做完上面说的问题之后,根据不同的Data构建实际的过滤器

//根据上面的订阅信息和订阅过滤信息,构建实际的过滤器

MessageFilter messageFilter;

//根据是不是支持重试,进行不同的过滤

if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {

messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,

this.brokerController.getConsumerFilterManager());

} else {

messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,

this.brokerController.getConsumerFilterManager());

}

将这个Filter传入MessageStore进行查询

在MessageStore中,首先是从ConsumeQueue中获取到消息

遍历ConsumeQueue的时候,判断tagsCode是否是通过Filter

//这一步是检查是否符合过滤条件

if (messageFilter != null

//不符合条件的话

&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {

if (getResult.getBufferTotalSize() == 0) {

status = GetMessageStatus.NO_MATCHED_MESSAGE;

}

//跳过

continue;

}

然后是遍历CommitLog的时候,判断CommitLog中的是否符合

//这一步检查CommitLog中的存储是否符合过滤

if (messageFilter != null

//这一步,tag模式下,isMatchedByCommitLog直接返回true,交由消费者端进行精确的匹配

&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {

if (getResult.getBufferTotalSize() == 0) {

status = GetMessageStatus.NO_MATCHED_MESSAGE;

}

// release…

selectResult.release();

continue;

}

在isMatchedByCommitLog过滤中,主要针对的是SQL92模式下的过滤,对于TAG模式和类模式,都是直接返回true的

对于消费者端,则是在PullAPIWrapper#processPuLlRequest中进行了处理

List<MessageExt> msgListFilterAgain = msgList;

if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {

msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());

for (MessageExt msg : msgList) {

if (msg.getTags() != null) {

if (subscriptionData.getTagsSet().contains(msg.getTags())) {

msgListFilterAgain.add(msg);

}

}

}

}

在获取到服务器端给的消息之后,进行解析,对tag进行判断,集合中包含消息的TAG则返回消费,不然就跳过

发表评论

邮箱地址不会被公开。 必填项已用*标注