消费者在消费完成之后,需要记录消费记录,不然不能让消费者从头开始消费吧

而不同的消费模式下,消费记录的存储位置和方式是不一致的

比如广播模式下,消费组的所有消息消费者都需要消费主题下所有的消息,就是同组内消费者的消息消费行为是对立的,彼此不影响,所以消费进度可以存储

但在集群模式下,同一个消费组内的所有消息消费者共享消息主题下的所有消息,同一条消息在同一时间只会被消费组内的一个消费者消费,并且随着消息队列的动态变化重新负载,所需要保存在一个每一个消费者都能访问到的地方

对于实际上,RMQ将消息存储抽取为一个接口OffsetStore

主要的接口如下

//加载到内存

void load() throws MQClientException;

//更新内存的消息消费进度

void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);

//读取消息消费进度

long readOffset(final MessageQueue mq, final ReadOffsetType type);

//持久化所有

void persistAll(final Set<MessageQueue> mqs);

//持久化单个

void persist(final MessageQueue mq);

//从内存中移除

void removeOffset(MessageQueue mq);

//更新存储在Broker的消息消费进度

void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,

MQBrokerException, InterruptedException, MQClientException;

首先,我们来说下广播模式下的消费进度存储

广播模式下,消息存储在本地,实现类为LocalFileOffsetStore

内部有一些重要的属性

//消息的存储目录

public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(

“rocketmq.client.localOffsetStoreDir”,

System.getProperty(“user.home”) + File.separator + “.rocketmq_offsets”);

private final static InternalLogger log = ClientLogger.getLog();

//消息客户端

private final MQClientInstance mQClientFactory;

//消息消费组

private final String groupName;

//存储路径

private final String storePath;

//内存中存着的消息消费进度

private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =

new ConcurrentHashMap<MessageQueue, AtomicLong>();

然后我们看下重要的函数 load

@Override

public void load() throws MQClientException {

//首先读取了内存中的offset Map<MessageQueue,AtomicLong>, 封装为了Wrapper对象

//readLocalOffset中,从storePath中的读取,读取不到,会尝试读取.bak文件

OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();

if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {

//逐个加载

offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {

AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);

log.info(“load consumer’s offset, {} {} {}”,

this.groupName,

mq,

offset.get());

}

}

}

然后进行存储,即持久化

@Override

public void persistAll(Set<MessageQueue> mqs) {

if (null == mqs || mqs.isEmpty())

return;

//创建一个Wrapper包装类

OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();

for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {

if (mqs.contains(entry.getKey())) {

AtomicLong offset = entry.getValue();

offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);

}

}

//转换为json

String jsonString = offsetSerializeWrapper.toJson(true);

if (jsonString != null) {

try {

//存入file

MixAll.string2File(jsonString, this.storePath);

} catch (IOException e) {

log.error(“persistAll consumer offset Exception, ” + this.storePath, e);

}

}

}

持久化也是直接json存储在本地即可

从上面的角度来看,消息消费的更新和持久化的代码还是比较简单的,我们主要还是分析下集群模式下的进度管理

集群模式下的消息存储

这是存放在Brorker端的消息存储,消息消费进度的实现类是RemoteBrokerOffsetStore

内实现基本为下图

对于读取,持久化 广播模式实现细节 差不多 集群模式的消息进度如果从内存中消费,则是直接读取Map,如果是从硬盘中消费,则是发送网络请求,请求命令为QUERY_CONSUMER_OFFSET

持久化消息进度,那么就发发送命令UPDATE_CONSUMER_OFFSET

而Broker端默认10s进行一次持久化

存储到设定的目录下

那么对于Broker端的消息消费近端的存储,我们需要进行一些思考

比如并发消息如何进行保存的

消费者线程池每次处理完成一个消息消费任务时候会从ProcessQueue中移除这批消息

并返回ProcessQueue中最小的偏移量,用该偏移量更新消息队列消费进度,也就是更新消费年进度和消费任务中的消息没有关系

比如:task1 queueOffset 20,40

task2 50,70

并且ProcessQueue中包含最小消息偏移量为10的消息

task1和2消费完成后,都使用偏移量10来更新消费进度

如果10的消息返回了消费完成之后,将直接使用ProcessQueue最大的偏移量的进行更新

但是如果10的消息迟迟无法进行更新成功,会导致一直无法推进消息进度

于是RMQ引入了一个消息拉取流控机制,就是ProcessQueue中的最大消息偏移量和最小偏移量不能超过这个值,超过这个值,则触发流控 延迟这个消息队列的消息拉取

发表评论

邮箱地址不会被公开。 必填项已用*标注