消费者在消费完成之后,需要记录消费记录,不然不能让消费者从头开始消费吧
而不同的消费模式下,消费记录的存储位置和方式是不一致的
比如广播模式下,消费组的所有消息消费者都需要消费主题下所有的消息,就是同组内消费者的消息消费行为是对立的,彼此不影响,所以消费进度可以存储
但在集群模式下,同一个消费组内的所有消息消费者共享消息主题下的所有消息,同一条消息在同一时间只会被消费组内的一个消费者消费,并且随着消息队列的动态变化重新负载,所需要保存在一个每一个消费者都能访问到的地方
对于实际上,RMQ将消息存储抽取为一个接口OffsetStore
主要的接口如下
//加载到内存
void load() throws MQClientException; //更新内存的消息消费进度 void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly); //读取消息消费进度 long readOffset(final MessageQueue mq, final ReadOffsetType type); //持久化所有 void persistAll(final Set<MessageQueue> mqs); //持久化单个 void persist(final MessageQueue mq); //从内存中移除 void removeOffset(MessageQueue mq); //更新存储在Broker的消息消费进度 void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; |
首先,我们来说下广播模式下的消费进度存储
广播模式下,消息存储在本地,实现类为LocalFileOffsetStore
内部有一些重要的属性
//消息的存储目录
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty( “rocketmq.client.localOffsetStoreDir”, System.getProperty(“user.home”) + File.separator + “.rocketmq_offsets”); private final static InternalLogger log = ClientLogger.getLog(); //消息客户端 private final MQClientInstance mQClientFactory; //消息消费组 private final String groupName; //存储路径 private final String storePath; //内存中存着的消息消费进度 private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(); |
然后我们看下重要的函数 load
@Override
public void load() throws MQClientException { //首先读取了内存中的offset Map<MessageQueue,AtomicLong>, 封装为了Wrapper对象 //readLocalOffset中,从storePath中的读取,读取不到,会尝试读取.bak文件 OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { //逐个加载 offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); log.info(“load consumer’s offset, {} {} {}”, this.groupName, mq, offset.get()); } } } |
然后进行存储,即持久化
@Override
public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return; //创建一个Wrapper包装类 OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { if (mqs.contains(entry.getKey())) { AtomicLong offset = entry.getValue(); offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); } } //转换为json String jsonString = offsetSerializeWrapper.toJson(true); if (jsonString != null) { try { //存入file MixAll.string2File(jsonString, this.storePath); } catch (IOException e) { log.error(“persistAll consumer offset Exception, ” + this.storePath, e); } } } |
持久化也是直接json存储在本地即可
从上面的角度来看,消息消费的更新和持久化的代码还是比较简单的,我们主要还是分析下集群模式下的进度管理
集群模式下的消息存储
这是存放在Brorker端的消息存储,消息消费进度的实现类是RemoteBrokerOffsetStore
内实现基本为下图
对于读取,持久化 广播模式实现细节 差不多 集群模式的消息进度如果从内存中消费,则是直接读取Map,如果是从硬盘中消费,则是发送网络请求,请求命令为QUERY_CONSUMER_OFFSET
持久化消息进度,那么就发发送命令UPDATE_CONSUMER_OFFSET
而Broker端默认10s进行一次持久化
存储到设定的目录下
那么对于Broker端的消息消费近端的存储,我们需要进行一些思考
比如并发消息如何进行保存的
消费者线程池每次处理完成一个消息消费任务时候会从ProcessQueue中移除这批消息
并返回ProcessQueue中最小的偏移量,用该偏移量更新消息队列消费进度,也就是更新消费年进度和消费任务中的消息没有关系
比如:task1 queueOffset 20,40
task2 50,70
并且ProcessQueue中包含最小消息偏移量为10的消息
task1和2消费完成后,都使用偏移量10来更新消费进度
如果10的消息返回了消费完成之后,将直接使用ProcessQueue最大的偏移量的进行更新
但是如果10的消息迟迟无法进行更新成功,会导致一直无法推进消息进度
于是RMQ引入了一个消息拉取流控机制,就是ProcessQueue中的最大消息偏移量和最小偏移量不能超过这个值,超过这个值,则触发流控 延迟这个消息队列的消息拉取