RMQ的事务是由生产者控制的,在其中事务消息由生产者生产后放入Broker,并由生产者确定是否真正的提交消息供消费者消费

其设计思路就是利用的二阶段提交来确定的

应用程序首先发送一个为prepare的消息,然后消息发送成功之后

RMQ先将其放入到主题为RMQ_SYS_TRANS_HALF_TOPIC的消息消费主题中

RMQ开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列的消息,向消息发送端发起消息反查,应用程序的反查接口返回事务状态是回滚还是提交

那么我们来看下事务相关流程代码

事务消息在消息生产者端是由TransactionMQProducer实现的

其内部主要持有了两个重要的对象

private ExecutorService executorService;

private TransactionListener transactionListener;

上面两个对象

transactionListener是事务监听器,实现本地事务的状态变更和事务回查两个主要操作

ExecutorService是事务状态回查异步执行线程池

在TransactionListener中,我们主要暴露的接口有

LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

LocalTransactionState checkLocalTransaction(final MessageExt msg);

上面的函数分别是

执行本地事务

事务消息状态回查

说完了基本的属性,我们集中看代码

入口位于生产者中的sendMessageInTransaction

@Override

public TransactionSendResult sendMessageInTransaction(final Message msg,

final Object arg) throws MQClientException {

if (null == this.transactionListener) {

throw new MQClientException(“TransactionListener is null”, null);

}

msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));

return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);

}

只是简单的校验了一下是不是存在监听者,不存在则直接抛出异常

接下来是发送的流程

位于DefaultMQProducerImpl中的sendMessageInTransaction

//再次校验监听者

TransactionListener transactionListener = getCheckListener();

if (null == localTransactionExecuter && null == transactionListener) {

throw new MQClientException(“tranExecutor is null”, null);

}

// ignore DelayTimeLevel parameter

if (msg.getDelayTimeLevel() != 0) {

MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

}

Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;

//设置消息属性,标记为事务消息

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, “true”);

//设置生产组

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

try {

//同步发送到RMQ

sendResult = this.send(msg);

} catch (Exception e) {

throw new MQClientException(“send message Exception”, e);

}

此函数接下来根据发送的结果进行一些后置操作

//判断发送结果

switch (sendResult.getSendStatus()) {

//如果发送成功了

case SEND_OK: {

try {

//存属性

if (sendResult.getTransactionId() != null) {

msg.putUserProperty(“__transactionId__”, sendResult.getTransactionId());

}

String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

if (null != transactionId && !””.equals(transactionId)) {

msg.setTransactionId(transactionId);

}

if (null != localTransactionExecuter) {

localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);

} else if (transactionListener != null) {

log.debug(“Used new transaction API”);

//执行监听者的executeLocalTransaction,因为后续有一个回查的操作,

// 所以需要在本地有一个唯一标识,为了方便回查

localTransactionState = transactionListener.executeLocalTransaction(msg, arg);

}

if (null == localTransactionState) {

//没有存储成功?

localTransactionState = LocalTransactionState.UNKNOW;

}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {

log.info(“executeLocalTransactionBranch return {}”, localTransactionState);

log.info(msg.toString());

}

} catch (Throwable e) {

log.info(“executeLocalTransactionBranch exception”, e);

log.info(msg.toString());

localException = e;

}

}

break;

case FLUSH_DISK_TIMEOUT:

case FLUSH_SLAVE_TIMEOUT:

case SLAVE_NOT_AVAILABLE:

//失败了,则设置为消息为回滚

localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;

break;

default:

break;

根据成功和失败,标记为不同的事务状态

try {

//根据状态,来结束事务

this.endTransaction(sendResult, localTransactionState, localException);

} catch (Exception e) {

log.warn(“local transaction execute ” + localTransactionState + “, but end broker transaction failed”, e);

}

最后,根据状态来结束事务

常见的状态有:

COMMIT_MESSAGE,

ROLLBACK_MESSAGE,

UNKNOW

分别代表,回滚,提交,未知

在endTranscation中并不会提交事务,而是记录消息状态,方便消息回查

故消息发送的流程到此就结束了

接下来,我们需要看Broker端,对于Perpare如何接纳的

String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (traFlag != null && Boolean.parseBoolean(traFlag)

&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1

if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {

response.setCode(ResponseCode.NO_PERMISSION);

response.setRemark(

“the broker[” + this.brokerController.getBrokerConfig().getBrokerIP1()

+ “] sending transaction message is forbidden”);

return response;

}

putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);

对于消息,判断属性中是Prepared消息,如果成立,那么走prepareMessage()

一直走到putHalfMessage函数中,对Message进行重新封装

//事务消息的封装

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {

//设置真实Topic

MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());

//设置真实QueueId

MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,

String.valueOf(msgInner.getQueueId()));

msgInner.setSysFlag(

MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));

//设置Topic

msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());

msgInner.setQueueId(0);

msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

return msgInner;

}

上述代码可以看出,消息Topic会被备份,然后设置Topic为RMQ_SYS_TRANS_HALF_TOPIC

接下来存入commitLog中,因为Topic已经被改变了,所以事务不会被消费者消费

在此外RMQ采用定时的方式消费这个主题

接下来的流程就是事务是如何进行提交和回滚的?

首先生产者主动提交事务

位于DeafultMQProducerImpl中的endTransaction

public void endTransaction(

final SendResult sendResult,

final LocalTransactionState localTransactionState,

final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {

final MessageId id;

if (sendResult.getOffsetMsgId() != null) {

id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());

} else {

id = MessageDecoder.decodeMessageId(sendResult.getMsgId());

}

String transactionId = sendResult.getTransactionId();

//获取到Broker地址

final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());

EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();

requestHeader.setTransactionId(transactionId);

requestHeader.setCommitLogOffset(id.getOffset());

//根据本地的事务状态

switch (localTransactionState) {

case COMMIT_MESSAGE:

//设置requestHeader中是否提交

requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);

break;

case ROLLBACK_MESSAGE:

requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);

break;

case UNKNOW:

requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);

break;

default:

break;

其发送完成,对应Broker的Netty中注册的是EndTransactionProcessor

其中的processRequest为

@Override

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws

RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

final EndTransactionRequestHeader requestHeader =

(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);

LOGGER.debug(“Transaction request:{}”, requestHeader);

if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {

response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);

LOGGER.warn(“Message store is slave mode, so end transaction is forbidden. “);

return response;

}

//根据解析出的Header中的TransactionCheck状态

if (requestHeader.getFromTransactionCheck()) {

switch (requestHeader.getCommitOrRollback()) {

//如果是UNKONW类型,则return

case MessageSysFlag.TRANSACTION_NOT_TYPE: {

LOGGER.warn(“Check producer[{}] transaction state, but it’s pending status.”

+ “RequestHeader: {} Remark: {}”,

RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

requestHeader.toString(),

request.getRemark());

return null;

}

case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {

LOGGER.warn(“Check producer[{}] transaction state, the producer commit the message.”

+ “RequestHeader: {} Remark: {}”,

RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

requestHeader.toString(),

request.getRemark());

break;

}

case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {

LOGGER.warn(“Check producer[{}] transaction state, the producer rollback the message.”

+ “RequestHeader: {} Remark: {}”,

RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

requestHeader.toString(),

request.getRemark());

break;

}

default:

return null;

}

} else {

switch (requestHeader.getCommitOrRollback()) {

case MessageSysFlag.TRANSACTION_NOT_TYPE: {

LOGGER.warn(“The producer[{}] end transaction in sending message, and it’s pending status.”

+ “RequestHeader: {} Remark: {}”,

RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

requestHeader.toString(),

request.getRemark());

return null;

}

case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {

break;

}

case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {

LOGGER.warn(“The producer[{}] end transaction in sending message, rollback the message.”

+ “RequestHeader: {} Remark: {}”,

RemotingHelper.parseChannelRemoteAddr(ctx.channel()),

requestHeader.toString(),

request.getRemark());

break;

}

default:

return null;

}

}

//走到这一步,要么是commit 要么rollback

OperationResult result = new OperationResult();

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {

//提交事务,首先获取到消息

result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);

if (result.getResponseCode() == ResponseCode.SUCCESS) {

RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);

if (res.getCode() == ResponseCode.SUCCESS) {

//恢复原本消息

MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());

msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));

msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());

msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());

msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());

MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);

//存入commitLog

RemotingCommand sendResult = sendFinalMessage(msgInner);

//如果成功了

if (sendResult.getCode() == ResponseCode.SUCCESS) {

//移除消息,将其存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中

this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());

}

return sendResult;

}

return res;

}

} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {

//也是找到消息

result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);

if (result.getResponseCode() == ResponseCode.SUCCESS) {

RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);

if (res.getCode() == ResponseCode.SUCCESS) {

//但是这里只移除了消息,不存入commitLog

this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());

}

return res;

}

}

response.setCode(result.getResponseCode());

response.setRemark(result.getResponseRemark());

return response;

}

总结一下对应的流程

就是获取到之前存储的消息

然后根据提交还是回滚,提交就存入一条恢复后的新的数据到commitLog,回滚则不操作commitLog

然后两者都存入到一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC中去,表示处理完成

然后是事务消息回查事务状态

在RMQ中事务消息会通过TransactionalMessaeCheckService线程定期的去检测RMQ_SYS_TRANS_HALF_TOPIC中的消息,来检测是否消息的状态,从而确定是回滚还是提交

检测的频率为1分钟一次,可以利用配置文件配置

@Override

protected void onWaitEnd() {

//在实际的等待代码中

long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();

int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();

//获取到了事务超时时间和事务回查的最大次数

long begin = System.currentTimeMillis();

log.info(“Begin to check prepare message, begin time:{}”, begin);

//开始检查

this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());

log.info(“End to check prepare message, consumed time:{}”, System.currentTimeMillis() – begin);

}

在上面的等待代码中

我们获取到了超时时间和最大的事务回查次数

然后进行检查了,检查的逻辑如下

TransactionalMessageService#check()

//获取topic

String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;

//首先获取到MessageQueue

Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);

if (msgQueues == null || msgQueues.size() == 0) {

log.warn(“The queue of topic is empty :” + topic);

return;

}

根据topic获取到旗下的多个MessageQueue

for (MessageQueue messageQueue : msgQueues) {

//遍历MessageQuue

long startTime = System.currentTimeMillis();

//别忘了,我们的事务处理有两个队列,一个是尚未处理的,一个是处理过得

//下面就是通过未处理的获取处理过的队列

MessageQueue opQueue = getOpQueue(messageQueue);

long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);

long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

log.info(“Before check, the queue={} msgOffset={} opOffset={}”, messageQueue, halfOffset, opOffset);

if (halfOffset < 0 || opOffset < 0) {

log.error(“MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue”, messageQueue,

halfOffset, opOffset);

continue;

}

根据消息存储的messageQueue获取到处理过的队列

List<Long> doneOpOffset = new ArrayList<>();

HashMap<Long, Long> removeMap = new HashMap<>();

//根据当前的处理进度依次的从已经处理的队列拉取32条消息,然后判断是不是已经处理过

PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);

if (null == pullResult) {

log.error(“The queue={} check msgOffset={} with opOffset={} failed, pullResult is null”,

messageQueue, halfOffset, opOffset);

continue;

}

获取32条消息,从已经处理的队列中获取,方便比较

进行处理,处理前先声明几个局部变量

// single thread

//空消息的次数

int getMessageNullCount = 1;

//处理HALF_TOPIC的最新进度

long newOffset = halfOffset;

//当前队列的队列偏移量

long i = halfOffset;

然后进行处理

//已经被处理过了

if (removeMap.containsKey(i)) {

log.info(“Half offset {} has been committed/rolled back”, i);

Long removedOpOffset = removeMap.remove(i);

doneOpOffset.add(removedOpOffset);

如果已经被处理过了的话,直接移除,然后查询Offset增加

接下来遇到的就是没有处理的,需要我们校验是不是达到了check的地步

//接下来判断时间问题

//当前时间减去消息存储时间

long valueOfCurrentMinusBorn = System.currentTimeMillis() – msgExt.getBornTimestamp();

//checkImmunityTime:事务检测时间->即prepare消息发出后,给生产者一定时间,让其自动提交,这段时间内,先不进行回查

//transactionTimeout: 事务消息超时时间-> 到了就需要发送回查消息的时间

long checkImmunityTime = transactionTimeout;

String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);

if (null != checkImmunityTimeStr) {

checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);

if (valueOfCurrentMinusBorn < checkImmunityTime) {

//还没达到了消息回查的时间,就不处理

//但是函数内部会重新存储

if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {

newOffset = i + 1;

i++;

continue;

}

}

上面的重新存储是因为,offset在不断的前进,我们没有办法回查,也不建议往回修改数据结构,所以都会重新放入一个commitLog一个msg

//获取到消息

List<MessageExt> opMsg = pullResult.getMsgFoundList();

boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)

//是否需要check

|| (opMsg != null && (opMsg.get(opMsg.size() – 1).getBornTimestamp() – startTime > transactionTimeout))

|| (valueOfCurrentMinusBorn <= -1);

if (isNeedCheck) {

//再次存入到commitLog中,因为offset是不断往前进的,不会返回去查找,所以有修改的都重新放进去

if (!putBackHalfMsgQueue(msgExt, i)) {

continue;

}

//发送具体的回查消息

listener.resolveHalfMsg(msgExt);

判断完成发送具体的回查消息

最后更新offset

//更新offset

if (newOffset != halfOffset) {

transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);

}

long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);

if (newOpOffset != opOffset) {

transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);

}

具体的回查函数如下

在一个异步的函数中执行

public void resolveHalfMsg(final MessageExt msgExt) {

executorService.execute(new Runnable() {

@Override

public void run() {

try {

sendCheckMessage(msgExt);

} catch (Exception e) {

LOGGER.error(“Send check message error!”, e);

}

}

});

}

发送请求给生产者端

public void sendCheckMessage(MessageExt msgExt) throws Exception {

//组装Header,最主要的设置为transactionId

CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();

checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());

checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());

checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));

checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());

checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());

msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));

msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

msgExt.setStoreSize(0);

String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);

Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);

if (channel != null) {

//通过通道发送回去

brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);

} else {

LOGGER.warn(“Check transaction failed, channel is null. groupId={}”, groupId);

}

}

发表评论

邮箱地址不会被公开。 必填项已用*标注