消息的消费是以组的模式开展,一个消费组可以有多个消费者,一个消费组可以订阅多个主题

自我总结如下

图片

消费组之间有集群模式和广播模式两种,集群模式,主题下的同一条消息只能有一个消费者消费

广播模式 主题下同一条消息会推送 给集群内所有消费者消费一次

消息服务器和消费者之间的消息推送有两种模式,推模式和拉模式

拉模式不必说,消费者主动拉起,而推模式,在RMQ的实现则是基于了拉模式,在拉模式上面封装了一层

对于多个消费者如何对消息队列进行负载的问题,RMQ遵循了一个通用的思想,一个消息队列同一时间只能有一个消费者消费,一个消费者可以消费多个消息队列

RMQ只支持局部消息顺序消费,即只保证一个消息队列上的消息顺序消费

如果需要保证消息的全局消费有序,则可以将主题的队列数设置为1,牺牲高可用性

我们初次了解一下消息消费者

消息消费者的模式有两种

推和拉,我们主要介绍推模式的消费者MQPushConsumer的主要API

常见的接口如下

图片

sendMessageBack 消息消费失败的时候,重新发送到Broker服务器

fetchSubscribeMessageQueues 获取本端,即客户端对主题topic分配了哪些消息队列

图片

registerMessageListener(final MessageListenerConcurrently) 注册并发消息事件监听器

registerMessageListener(final MessageListenerOrder) 注册顺序消息事件监听器

subscribe 订阅消息 支持传入SQL表达式 like between如此

还有着重载 传入MessageSelector,内置一个Expression

实际的实现者如下

内部的属性有:

protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

实际的内部实现委托类

private String consumerGroup;

消费者所属组,一组有多个消费者

private MessageModel messageModel = MessageModel.CLUSTERING;

消息消费者模式,默认是集群模式,还有广播模式

private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

如果拉取不到消息的时候重新计算下策略

包括,从队列的最大偏移量开始消费

从队列的最小偏移量开始消费

启动时间戳开始消费

这个策略的生效取决于读到的偏移量,如果从MessageQueue中的偏移量不小于0,使用读到的偏移量,不然才是策略生效

private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

集群模式下的消息队列负载策略

private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();

订阅消息

private MessageListener messageListener;

消息业务监听器

private OffsetStore offsetStore;

消费进度存储器

/**

* Minimum consumer thread number

*/

private int consumeThreadMin = 20;

/**

* Max consumer thread number

*/

private int consumeThreadMax = 20;

最大最小线程数,消费者线程池无界队列,故消费者只有核心线程数,也就是20个

/**

* Concurrently max span offset.it has no effect on sequential consumption

*/

private int consumeConcurrentlyMaxSpan = 2000;

并发消费的时候的最大跨度,如果偏移量最大的消息和偏移量最小的消息跨度超过了2000,就延迟50毫秒

private int pullThresholdForQueue = 1000;

默认值1000,每1000次流控后打印流控日志

private long pullInterval = 0;

推模式下的任务间隔时间

private int consumeMessageBatchMaxSize = 1;

并发消费的时候一次消费的消息条数

private int pullBatchSize = 32;

消息拉取所拉取的条数,默认32条数

private boolean postSubscriptionWhenPull = false;

每次拉取都更新订阅消息

private int maxReconsumeTimes = -1;

最大消费重试次数

private long consumeTimeout = 15;

消息消费超时时间 默认15分钟

private long suspendCurrentQueueTimeMillis = 1000;

延迟将消息提交给消费者的等待时间,默认延迟1S

private long consumeTimeout = 15;

消息消费超时时间,默认为15,单位为分钟

具体的内部实现类

DefaultMQPushConsumerImpl

其在DefaultMQPushConsumer的start中进行启动,其中调用了DefaultMQPushConsumerImpl的start

case CREATE_JUST:

log.info(“the consumer [{}] start beginning. messageModel={}, isUnitMode={}”, this.defaultMQPushConsumer.getConsumerGroup(),

this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

this.copySubscription();

在start函数中,还是先将其设置为了Failed的状态

然后首先进行订阅的信息的拷贝

在copySubscription函数中进行的实现

try {

//获取订阅信息,主要来源有两个

//1.从本类的subscribe中可以获取

Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();

if (sub != null) {

for (final Map.Entry<String, String> entry : sub.entrySet()) {

final String topic = entry.getKey();

final String subString = entry.getValue();

SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),

topic, subString);

this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

}

}

if (null == this.messageListenerInner) {

this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();

}

switch (this.defaultMQPushConsumer.getMessageModel()) {

case BROADCASTING:

break;

case CLUSTERING:

final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());

//构建重试主题消息,以消费组为主,消费者启动的时候自动订阅这个主题,参与其消息队列负载

SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),

retryTopic, SubscriptionData.SUB_ALL);

this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);

break;

default:

break;

}

} catch (Exception e) {

throw new MQClientException(“subscription exception”, e);

}

返回来继续进行初始化

//初始化Instance

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

//初始化RebalanceImple(消息重新实现类)

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());

this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

进行初始化偏移量

//初始化偏移量

if (this.defaultMQPushConsumer.getOffsetStore() != null) {

this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();

} else {

switch (this.defaultMQPushConsumer.getMessageModel()) {

//根据不同的消息消费模式来进行不同的初始化方式

case BROADCASTING:

//广播模式,保存在本地

this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

break;

case CLUSTERING:

//如果是集群模式,保存在远端

this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

break;

default:

break;

}

this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

}

//重载offset

this.offsetStore.load();

如果是消息消费是集群模式,那么消息进度保存在本地,集群模式保存在远端

//获取是否是顺序消费

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

this.consumeOrderly = true;

this.consumeMessageService = //创建不同的Service

new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {

this.consumeOrderly = false;

this.consumeMessageService =

new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

}

开启消息消费线程

this.consumeMessageService.start();

尝试注册消费者

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

if (!registerOK) {

this.serviceState = ServiceState.CREATE_JUST;

this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());

throw new MQClientException(“The consumer group[” + this.defaultMQPushConsumer.getConsumerGroup()

+ “] has been created before, specify another name please.” + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

null);

}

上面的mqClinetFactory JVM内唯一,用的是static

这就基本完成Service的start流程了,接下来是消息的正式拉取了

发表评论

邮箱地址不会被公开。 必填项已用*标注