消息的消费是以组的模式开展,一个消费组可以有多个消费者,一个消费组可以订阅多个主题
自我总结如下
消费组之间有集群模式和广播模式两种,集群模式,主题下的同一条消息只能有一个消费者消费
广播模式 主题下同一条消息会推送 给集群内所有消费者消费一次
消息服务器和消费者之间的消息推送有两种模式,推模式和拉模式
拉模式不必说,消费者主动拉起,而推模式,在RMQ的实现则是基于了拉模式,在拉模式上面封装了一层
对于多个消费者如何对消息队列进行负载的问题,RMQ遵循了一个通用的思想,一个消息队列同一时间只能有一个消费者消费,一个消费者可以消费多个消息队列
RMQ只支持局部消息顺序消费,即只保证一个消息队列上的消息顺序消费
如果需要保证消息的全局消费有序,则可以将主题的队列数设置为1,牺牲高可用性
我们初次了解一下消息消费者
消息消费者的模式有两种
推和拉,我们主要介绍推模式的消费者MQPushConsumer的主要API
常见的接口如下
sendMessageBack 消息消费失败的时候,重新发送到Broker服务器
fetchSubscribeMessageQueues 获取本端,即客户端对主题topic分配了哪些消息队列
registerMessageListener(final MessageListenerConcurrently) 注册并发消息事件监听器
registerMessageListener(final MessageListenerOrder) 注册顺序消息事件监听器
subscribe 订阅消息 支持传入SQL表达式 like between如此
还有着重载 传入MessageSelector,内置一个Expression
实际的实现者如下
内部的属性有:
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
实际的内部实现委托类
private String consumerGroup;
消费者所属组,一组有多个消费者
private MessageModel messageModel = MessageModel.CLUSTERING;
消息消费者模式,默认是集群模式,还有广播模式
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
如果拉取不到消息的时候重新计算下策略
包括,从队列的最大偏移量开始消费
从队列的最小偏移量开始消费
启动时间戳开始消费
这个策略的生效取决于读到的偏移量,如果从MessageQueue中的偏移量不小于0,使用读到的偏移量,不然才是策略生效
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
集群模式下的消息队列负载策略
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
订阅消息
private MessageListener messageListener;
消息业务监听器
private OffsetStore offsetStore;
消费进度存储器
/**
* Minimum consumer thread number
*/
private int consumeThreadMin = 20;
/**
* Max consumer thread number
*/
private int consumeThreadMax = 20;
最大最小线程数,消费者线程池无界队列,故消费者只有核心线程数,也就是20个
/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
并发消费的时候的最大跨度,如果偏移量最大的消息和偏移量最小的消息跨度超过了2000,就延迟50毫秒
private int pullThresholdForQueue = 1000;
默认值1000,每1000次流控后打印流控日志
private long pullInterval = 0;
推模式下的任务间隔时间
private int consumeMessageBatchMaxSize = 1;
并发消费的时候一次消费的消息条数
private int pullBatchSize = 32;
消息拉取所拉取的条数,默认32条数
private boolean postSubscriptionWhenPull = false;
每次拉取都更新订阅消息
private int maxReconsumeTimes = -1;
最大消费重试次数
private long consumeTimeout = 15;
消息消费超时时间 默认15分钟
private long suspendCurrentQueueTimeMillis = 1000;
延迟将消息提交给消费者的等待时间,默认延迟1S
private long consumeTimeout = 15;
消息消费超时时间,默认为15,单位为分钟
具体的内部实现类
DefaultMQPushConsumerImpl
其在DefaultMQPushConsumer的start中进行启动,其中调用了DefaultMQPushConsumerImpl的start
case CREATE_JUST:
log.info(“the consumer [{}] start beginning. messageModel={}, isUnitMode={}”, this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); |
在start函数中,还是先将其设置为了Failed的状态
然后首先进行订阅的信息的拷贝
在copySubscription函数中进行的实现
try {
//获取订阅信息,主要来源有两个 //1.从本类的subscribe中可以获取 Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); //构建重试主题消息,以消费组为主,消费者启动的时候自动订阅这个主题,参与其消息队列负载 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException(“subscription exception”, e); } |
返回来继续进行初始化
//初始化Instance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); //初始化RebalanceImple(消息重新实现类) this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); |
进行初始化偏移量
//初始化偏移量
if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { //根据不同的消息消费模式来进行不同的初始化方式 case BROADCASTING: //广播模式,保存在本地 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: //如果是集群模式,保存在远端 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } //重载offset this.offsetStore.load(); |
如果是消息消费是集群模式,那么消息进度保存在本地,集群模式保存在远端
//获取是否是顺序消费
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = //创建不同的Service new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } |
开启消息消费线程
this.consumeMessageService.start(); |
尝试注册消费者
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException(“The consumer group[” + this.defaultMQPushConsumer.getConsumerGroup() + “] has been created before, specify another name please.” + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } |
上面的mqClinetFactory JVM内唯一,用的是static
这就基本完成Service的start流程了,接下来是消息的正式拉取了