RMQ通过DeafultMQPushConsumerImpl的subscribe来实现类模式的消息过滤

public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {}

在其中的参数包含 topic 类名 类源代码

public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {

try {

SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),

topic, “*”);

subscriptionData.setSubString(fullClassName);

subscriptionData.setClassFilterMode(true);

subscriptionData.setFilterClassSource(filterClassSource);

this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

if (this.mQClientFactory != null) {

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

}

} catch (Exception e) {

throw new MQClientException(“subscription exception”, e);

}

}

首先,我们构建了订阅信息,将订阅信息添加到RebalanceImpl,主要目标是RebalanceImpl会对订阅信息表中的主题进行消息队列的负载,创建消息拉取任务

然后定期的将过滤类的源码上传到FilterServer

正是上面的sendHeartbeatToAllBrokerWithLock()

在其中调用了MQClientInstance#uploadFilterClassSourceToAllFilterServer,我们看下如何上传

TopicRouteData topicRouteData = this.topicRouteTable.get(topic);

if (topicRouteData != null

&& topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {

Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();

while (it.hasNext()) {

Entry<String, List<String>> next = it.next();

List<String> value = next.getValue();

for (final String fsAddr : value) {

//获取到每个Addr

try {

//调用实际的网络交互,进行注册

this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,

5000);

log.info(“register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}”, fsAddr, consumerGroup,

topic, fullClassName);

} catch (Exception e) {

log.error(“uploadFilterClassToAllFilterServer Exception”, e);

}

}

}

}

上面只是获取到缓存在FilterServerTable中的各个Server的Addr,然后调用实际的交互Client发送注册信息

到此,客户端发送了订阅信息,我们看下对应的FilterServer如何实现

FilterClassMananger#registerFilterClass

在整个FilterServer中,维护了filterClassTable的Map数据结构

那么,根据ConsumerGroup和topic,组合作为key,从其中尝试取出FilterClassInfo

boolean registerNew = false;

FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);

然后尝试判断是否更新Filter

//没注册过

if (null == filterClassInfoPrev) {

registerNew = true;

} else {

//可以更新FilterClass

if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {

//尝试获取到CRC,进行对比

if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) {

registerNew = true;

}

}

}

如果有更新或者新注册的,那么就更新FilterServerInfoTable并创建实例

//如果是有更新,或者新注册

if (registerNew) {

//尝试锁住

synchronized (this.compileLock) {

filterClassInfoPrev = this.filterClassTable.get(key);

if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {

return true;

}

try {

//创建新的FilterClassInfo

FilterClassInfo filterClassInfoNew = new FilterClassInfo();

filterClassInfoNew.setClassName(className);

filterClassInfoNew.setClassCRC(0);

filterClassInfoNew.setMessageFilter(null);

//创建实例

if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {

String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);

Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);

Object newInstance = newClass.newInstance();

filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);

filterClassInfoNew.setClassCRC(classCRC);

}

//放入

this.filterClassTable.put(key, filterClassInfoNew);

} catch (Throwable e) {

String info =

String

.format(

“FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s”,

consumerGroup, topic, className);

log.error(info, e);

return false;

}

}

}

在创建完成实例后,强制转换类型为MessageFilter,然后放入

上面就是一个注册类的全过程

发表评论

邮箱地址不会被公开。 必填项已用*标注