RMQ通过DeafultMQPushConsumerImpl的subscribe来实现类模式的消息过滤
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {} |
在其中的参数包含 topic 类名 类源代码
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
try { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, “*”); subscriptionData.setSubString(fullClassName); subscriptionData.setClassFilterMode(true); subscriptionData.setFilterClassSource(filterClassSource); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch (Exception e) { throw new MQClientException(“subscription exception”, e); } } |
首先,我们构建了订阅信息,将订阅信息添加到RebalanceImpl,主要目标是RebalanceImpl会对订阅信息表中的主题进行消息队列的负载,创建消息拉取任务
然后定期的将过滤类的源码上传到FilterServer
正是上面的sendHeartbeatToAllBrokerWithLock()
在其中调用了MQClientInstance#uploadFilterClassSourceToAllFilterServer,我们看下如何上传
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) { Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator(); while (it.hasNext()) { Entry<String, List<String>> next = it.next(); List<String> value = next.getValue(); for (final String fsAddr : value) { //获取到每个Addr try { //调用实际的网络交互,进行注册 this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody, 5000); log.info(“register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}”, fsAddr, consumerGroup, topic, fullClassName); } catch (Exception e) { log.error(“uploadFilterClassToAllFilterServer Exception”, e); } } } } |
上面只是获取到缓存在FilterServerTable中的各个Server的Addr,然后调用实际的交互Client发送注册信息
到此,客户端发送了订阅信息,我们看下对应的FilterServer如何实现
FilterClassMananger#registerFilterClass
在整个FilterServer中,维护了filterClassTable的Map数据结构
那么,根据ConsumerGroup和topic,组合作为key,从其中尝试取出FilterClassInfo
boolean registerNew = false;
FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key); |
然后尝试判断是否更新Filter
//没注册过
if (null == filterClassInfoPrev) { registerNew = true; } else { //可以更新FilterClass if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { //尝试获取到CRC,进行对比 if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { registerNew = true; } } } |
如果有更新或者新注册的,那么就更新FilterServerInfoTable并创建实例
//如果是有更新,或者新注册
if (registerNew) { //尝试锁住 synchronized (this.compileLock) { filterClassInfoPrev = this.filterClassTable.get(key); if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) { return true; } try { //创建新的FilterClassInfo FilterClassInfo filterClassInfoNew = new FilterClassInfo(); filterClassInfoNew.setClassName(className); filterClassInfoNew.setClassCRC(0); filterClassInfoNew.setMessageFilter(null); //创建实例 if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource); Object newInstance = newClass.newInstance(); filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); filterClassInfoNew.setClassCRC(classCRC); } //放入 this.filterClassTable.put(key, filterClassInfoNew); } catch (Throwable e) { String info = String .format( “FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s”, consumerGroup, topic, className); log.error(info, e); return false; } } } |
在创建完成实例后,强制转换类型为MessageFilter,然后放入
上面就是一个注册类的全过程