我们基于了PUSH的方式,讲解拉取,因为Pull模式下,拉取更加的基础,从难倒易罢了

消息消费分为了广播模式和集群模式,对于广播模式来说,每一个消费者都会去拉取订阅主题下的所有消费队列的消息

集群模式下,同一个消费组内有多个消息消费者,存在着多个消费队列,如何进行负载均衡的呢?

这次我们进行学习,就是从

上面start的最后,mqClientFactory中的start开始

在mqClientFactory中

// Start pull service

this.pullMessageService.start();

在这个pullMessageService中,我们继承了ServiceThread

服务线程 ,通过run方法启动

@Override

public void run() {

log.info(this.getServiceName() + ” service started”);

while (!this.isStopped()) {

try {

//不断的从内部阻塞队列中拿取请求

PullRequest pullRequest = this.pullRequestQueue.take();

this.pullMessage(pullRequest);

} catch (InterruptedException ignored) {

} catch (Exception e) {

log.error(“Pull Message Service Run Method exception”, e);

}

}

log.info(this.getServiceName() + ” service end”);

}

在PullMessageService中,实现其实很简单就是不断的从阻塞队列中拿取数据

那么问题来了,PullRequest是何时放进去的呢?

因为pullRequestQueue是个内部属性,故放入的函数必然在本类之中

图片

在PullMessageService中,提供了延迟添加和立即添加两种方式将PullRequest方法放入Queue中

根据上面的

executePullRequestImmediately,查找其调用

可以发现,从DefaultMQPushConsumer中可以看到

public void executePullRequestImmediately(final PullRequest pullRequest) {

this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);

}

以及reblanceImpl中放入了queue

我们先说DefaultMQPushConsumer中的调用

从整体的入口类中看

private void pullMessage(final PullRequest pullRequest) {

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

if (consumer != null) {

DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;

impl.pullMessage(pullRequest);

} else {

log.warn(“No matched consumer for the PullRequest {}, drop it”, pullRequest);

}

}

从mqClientFactory中获取到ConsumerInner,然后转换一下

进行相对应的拉取

在继续走其调用之前,对于PullRequest的核心属性,我们需要进行一下了解

cprivate String consumerGroup;

private MessageQueue messageQueue;

private ProcessQueue processQueue;

private long nextOffset;

private boolean lockedFirst = false;

consumerGroup 消费者组

messageQueue 待消费队列

processQueue 消息处理队列,从Borker获取的数据先放到此队列,然后进行消费

nextOffset 待拉取的MessageQueue偏移量

lockedFirtst 是否被锁定

其中的processQueue,是Message存放的地方,因为PullMessageService默认每次拉取32条消息,在这个队列中,按照偏移量一次存放在其中,然后进行提交到消费者消费线程池,进行消息消费

这个ProcessQueue 也是封装后的数据结构,其内部包含属性如下

//控制下面的msgTreeMap,consumingMsgOrderlyTreeMap

private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();

//存储容器,键为消息在ConsumerQueue的偏移量,MessageExt 消息实体

private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

//ProcessQueue中总消息数

private final AtomicLong msgCount = new AtomicLong();

private final AtomicLong msgSize = new AtomicLong();

private final Lock lockConsume = new ReentrantLock();

/**

* A subset of msgTreeMap, will only be used when orderly consume

*/

//消息临时存储容器,用于存储消息,在消费前,临时存储在这里面,也是为了处理顺序消息

private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();

private final AtomicLong tryUnlockTimes = new AtomicLong(0);

//最大队列偏移量

private volatile long queueOffsetMax = 0L;

//当前Queue是否被抛弃

private volatile boolean dropped = false;

//上一次消息拉取时间戳

private volatile long lastPullTimestamp = System.currentTimeMillis();

//上一次消息消费时间戳

private volatile long lastConsumeTimestamp = System.currentTimeMillis();

对于其包含的核心方法,主要有

public boolean isLockExpired()

锁是否过期

public boolean isPullExpired()

判断PullMessageService是否空闲

public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer)

移除消费超时的消息.

public boolean putMessage(final List<MessageExt> msgs)

PullMessage拉取消息后,调用该方法讲消息添加到ProcessQueue

public long getMaxSpan()

消息最大间隔,getMaxSpan()/20 并不能说明Procequeue包含的消息个数

public long removeMessage(final List<MessageExt> msgs)

移除消息

public void rollback()

将所有消息重新放入msgTreeMap,清除consumingMsgOrderlyTreeMap

public long commit()

提交消息

public void makeMessageToConsumeAgain(List<MessageExt> msgs)

重新消费一批消息,将其从OrderMap中移除,并加入到msgTreeMap

public List<MessageExt> takeMessages(final int batchSize)

取出batchSize条消息

说完了ProcessQueue的流程之后,依次看消息拉取的流程

整体分为了

1.客户端封装拉取消息请求

2.消息服务端查找并返回消息

3.消息拉取客户端处理返回的消息

发表评论

邮箱地址不会被公开。 必填项已用*标注