我们基于了PUSH的方式,讲解拉取,因为Pull模式下,拉取更加的基础,从难倒易罢了
消息消费分为了广播模式和集群模式,对于广播模式来说,每一个消费者都会去拉取订阅主题下的所有消费队列的消息
集群模式下,同一个消费组内有多个消息消费者,存在着多个消费队列,如何进行负载均衡的呢?
这次我们进行学习,就是从
上面start的最后,mqClientFactory中的start开始
在mqClientFactory中
// Start pull service
this.pullMessageService.start(); |
在这个pullMessageService中,我们继承了ServiceThread
服务线程 ,通过run方法启动
@Override
public void run() { log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { try { //不断的从内部阻塞队列中拿取请求 PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error(“Pull Message Service Run Method exception”, e); } } log.info(this.getServiceName() + ” service end”); } |
在PullMessageService中,实现其实很简单就是不断的从阻塞队列中拿取数据
那么问题来了,PullRequest是何时放进去的呢?
因为pullRequestQueue是个内部属性,故放入的函数必然在本类之中
在PullMessageService中,提供了延迟添加和立即添加两种方式将PullRequest方法放入Queue中
根据上面的
executePullRequestImmediately,查找其调用
可以发现,从DefaultMQPushConsumer中可以看到
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); } |
以及reblanceImpl中放入了queue
我们先说DefaultMQPushConsumer中的调用
从整体的入口类中看
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn(“No matched consumer for the PullRequest {}, drop it”, pullRequest); } } |
从mqClientFactory中获取到ConsumerInner,然后转换一下
进行相对应的拉取
在继续走其调用之前,对于PullRequest的核心属性,我们需要进行一下了解
cprivate String consumerGroup;
private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean lockedFirst = false; |
consumerGroup 消费者组
messageQueue 待消费队列
processQueue 消息处理队列,从Borker获取的数据先放到此队列,然后进行消费
nextOffset 待拉取的MessageQueue偏移量
lockedFirtst 是否被锁定
其中的processQueue,是Message存放的地方,因为PullMessageService默认每次拉取32条消息,在这个队列中,按照偏移量一次存放在其中,然后进行提交到消费者消费线程池,进行消息消费
这个ProcessQueue 也是封装后的数据结构,其内部包含属性如下
//控制下面的msgTreeMap,consumingMsgOrderlyTreeMap
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); //存储容器,键为消息在ConsumerQueue的偏移量,MessageExt 消息实体 private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); //ProcessQueue中总消息数 private final AtomicLong msgCount = new AtomicLong(); private final AtomicLong msgSize = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); /** * A subset of msgTreeMap, will only be used when orderly consume */ //消息临时存储容器,用于存储消息,在消费前,临时存储在这里面,也是为了处理顺序消息 private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong tryUnlockTimes = new AtomicLong(0); //最大队列偏移量 private volatile long queueOffsetMax = 0L; //当前Queue是否被抛弃 private volatile boolean dropped = false; //上一次消息拉取时间戳 private volatile long lastPullTimestamp = System.currentTimeMillis(); //上一次消息消费时间戳 private volatile long lastConsumeTimestamp = System.currentTimeMillis(); |
对于其包含的核心方法,主要有
public boolean isLockExpired()
锁是否过期 public boolean isPullExpired() 判断PullMessageService是否空闲 public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) 移除消费超时的消息. public boolean putMessage(final List<MessageExt> msgs) PullMessage拉取消息后,调用该方法讲消息添加到ProcessQueue public long getMaxSpan() 消息最大间隔,getMaxSpan()/20 并不能说明Procequeue包含的消息个数 public long removeMessage(final List<MessageExt> msgs) 移除消息 public void rollback() 将所有消息重新放入msgTreeMap,清除consumingMsgOrderlyTreeMap public long commit() 提交消息 public void makeMessageToConsumeAgain(List<MessageExt> msgs) 重新消费一批消息,将其从OrderMap中移除,并加入到msgTreeMap public List<MessageExt> takeMessages(final int batchSize) 取出batchSize条消息 |
说完了ProcessQueue的流程之后,依次看消息拉取的流程
整体分为了
1.客户端封装拉取消息请求
2.消息服务端查找并返回消息
3.消息拉取客户端处理返回的消息