如果消息监听器返回的消费结果是RECONSUME_LATER,需要将消息发送给Broker延迟消息,如果发送ACK失败,那么就延迟5s后进行消费,客户端的发送ACK消息入口

MQClientAPIImpl#consumerSendMessageBack

封装的消息为 RequestCode.CONSUMER_SEND_MSG_BACK

然后封装为了ConsumerSendMsgBackRequestHeader

Header内部的属性包含

offset 物理偏移量

group 消费组名

delaylevel 延迟等级

originMsgId 消息Id

maxReconsumeTimes 最大消费次数,默认为16次

我们将Header封装好之后,将其同步的发送到服务端,服务器端会在Netty中注册Processor进行处理

即为SendMessageProcessor中处理

consumerSendMsgBack函数负责处理此功能

在其中,首先获取到BrokerConfig

//获取到消费组的订阅配置信息

SubscriptionGroupConfig subscriptionGroupConfig =

this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());

config类中的属性主要包含

//消费组名

private String groupName;

//是否可以消费

private boolean consumeEnable = true;

//是否允许从最小偏移量开始消费

private boolean consumeFromMinEnable = true;

//以广播模式消费

private boolean consumeBroadcastEnable = true;

//重试队列,默认为1

private int retryQueueNums = 1;

//最大重试次数

private int retryMaxTimes = 16;

private long brokerId = MixAll.MASTER_ID;

//如果消息阻塞 将转向 这个broker上拉取消息

private long whichBrokerWhenConsumeSlowly = 1;

//是否立刻开始重新负载

private boolean notifyConsumerIdsChangedEnable = true;

接下来,获取了config,如果为空,直接返回

//如果为空,则设置response为订阅组不存在,并直接返回

if (null == subscriptionGroupConfig) {

response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);

response.setRemark(“subscription group not exist, ” + requestHeader.getGroup() + ” ”

+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));

return CompletableFuture.completedFuture(response);

}

if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {

response.setCode(ResponseCode.NO_PERMISSION);

response.setRemark(“the broker[” + this.brokerController.getBrokerConfig().getBrokerIP1() + “] sending message is forbidden”);

return CompletableFuture.completedFuture(response);

}

接下来检查配置的消息重试队列数量,如果小于0,说明不支持重试,直接返回

//如果重试的队列小于0,说明不支持重试,直接返回成功

if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return CompletableFuture.completedFuture(response);

}

构建新的topic,方便构建新的config

//构建新的topic

String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());

//获取到queueInt id,方便构建config

int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

int topicSysFlag = 0;

if (requestHeader.isUnitMode()) {

topicSysFlag = TopicSysFlag.buildSysFlag(false, true);

}

构建新的config

//构建topicConfig

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(

newTopic,

subscriptionGroupConfig.getRetryQueueNums(),

PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);

if (null == topicConfig) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark(“topic[” + newTopic + “] not exist”);

return CompletableFuture.completedFuture(response);

}

if (!PermName.isWriteable(topicConfig.getPerm())) {

response.setCode(ResponseCode.NO_PERMISSION);

response.setRemark(String.format(“the topic[%s] sending message is forbidden”, newTopic));

return CompletableFuture.completedFuture(response);

}

尝试获取到对应的消息

//获取到对应的消息

MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());

//如果消息为null,别说了,返回

if (null == msgExt) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark(“look message by offset failed, ” + requestHeader.getOffset());

return CompletableFuture.completedFuture(response);

}

更新获取此消息的重试topic

//获取到此消息的重试topic

final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);

if (null == retryTopic) {

//为空,则将这个topic放进去

MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());

}

重试次数超过最大重试次数的处理逻辑

//如果已经重试次数超过了

if (msgExt.getReconsumeTimes() >= maxReconsumeTimes

|| delayLevel < 0) {

//获取DLQ队列名册个

newTopic = MixAll.getDLQTopic(requestHeader.getGroup());

queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

//打入冷宫,不得录用

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,

DLQ_NUMS_PER_GROUP,

PermName.PERM_WRITE, 0);

if (null == topicConfig) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark(“topic[” + newTopic + “] not exist”);

return CompletableFuture.completedFuture(response);

}

新建一个Msg对象

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

msgInner.setTopic(newTopic);

msgInner.setBody(msgExt.getBody());

msgInner.setFlag(msgExt.getFlag());

MessageAccessor.setProperties(msgInner, msgExt.getProperties());

msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);

msgInner.setSysFlag(msgExt.getSysFlag());

msgInner.setBornTimestamp(msgExt.getBornTimestamp());

msgInner.setBornHost(msgExt.getBornHost());

msgInner.setStoreHost(msgExt.getStoreHost());

msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);

MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

存储msg,根据存储的结果返回

return putMessageResult.thenApply((r) -> {

if (r != null) {

switch (r.getPutMessageStatus()) {

case PUT_OK:

String backTopic = msgExt.getTopic();

String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);

if (correctTopic != null) {

backTopic = correctTopic;

}

this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

default:

break;

}

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark(r.getPutMessageStatus().name());

return response;

}

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark(“putMessageResult is null”);

return response;

});

这样就是在Broker上的ACK的整体流程

发表评论

邮箱地址不会被公开。 必填项已用*标注