如果消息监听器返回的消费结果是RECONSUME_LATER,需要将消息发送给Broker延迟消息,如果发送ACK失败,那么就延迟5s后进行消费,客户端的发送ACK消息入口
MQClientAPIImpl#consumerSendMessageBack
封装的消息为 RequestCode.CONSUMER_SEND_MSG_BACK
然后封装为了ConsumerSendMsgBackRequestHeader
Header内部的属性包含
offset 物理偏移量
group 消费组名
delaylevel 延迟等级
originMsgId 消息Id
maxReconsumeTimes 最大消费次数,默认为16次
我们将Header封装好之后,将其同步的发送到服务端,服务器端会在Netty中注册Processor进行处理
即为SendMessageProcessor中处理
consumerSendMsgBack函数负责处理此功能
在其中,首先获取到BrokerConfig
//获取到消费组的订阅配置信息
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); |
config类中的属性主要包含
//消费组名
private String groupName; //是否可以消费 private boolean consumeEnable = true; //是否允许从最小偏移量开始消费 private boolean consumeFromMinEnable = true; //以广播模式消费 private boolean consumeBroadcastEnable = true; //重试队列,默认为1 private int retryQueueNums = 1; //最大重试次数 private int retryMaxTimes = 16; private long brokerId = MixAll.MASTER_ID; //如果消息阻塞 将转向 这个broker上拉取消息 private long whichBrokerWhenConsumeSlowly = 1; //是否立刻开始重新负载 private boolean notifyConsumerIdsChangedEnable = true; |
接下来,获取了config,如果为空,直接返回
//如果为空,则设置response为订阅组不存在,并直接返回
if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark(“subscription group not exist, ” + requestHeader.getGroup() + ” ” + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return CompletableFuture.completedFuture(response); } if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(“the broker[” + this.brokerController.getBrokerConfig().getBrokerIP1() + “] sending message is forbidden”); return CompletableFuture.completedFuture(response); } |
接下来检查配置的消息重试队列数量,如果小于0,说明不支持重试,直接返回
//如果重试的队列小于0,说明不支持重试,直接返回成功
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return CompletableFuture.completedFuture(response); } |
构建新的topic,方便构建新的config
//构建新的topic
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); //获取到queueInt id,方便构建config int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); int topicSysFlag = 0; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } |
构建新的config
//构建topicConfig
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(“topic[” + newTopic + “] not exist”); return CompletableFuture.completedFuture(response); } if (!PermName.isWriteable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format(“the topic[%s] sending message is forbidden”, newTopic)); return CompletableFuture.completedFuture(response); } |
尝试获取到对应的消息
//获取到对应的消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); //如果消息为null,别说了,返回 if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(“look message by offset failed, ” + requestHeader.getOffset()); return CompletableFuture.completedFuture(response); } |
更新获取此消息的重试topic
//获取到此消息的重试topic
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (null == retryTopic) { //为空,则将这个topic放进去 MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); } |
重试次数超过最大重试次数的处理逻辑
//如果已经重试次数超过了
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { //获取DLQ队列名册个 newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; //打入冷宫,不得录用 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(“topic[” + newTopic + “] not exist”); return CompletableFuture.completedFuture(response); } |
新建一个Msg对象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); |
存储msg,根据存储的结果返回
return putMessageResult.thenApply((r) -> {
if (r != null) { switch (r.getPutMessageStatus()) { case PUT_OK: String backTopic = msgExt.getTopic(); String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (correctTopic != null) { backTopic = correctTopic; } this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; default: break; } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(r.getPutMessageStatus().name()); return response; } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(“putMessageResult is null”); return response; }); |
这样就是在Broker上的ACK的整体流程