RMQ的事务是由生产者控制的,在其中事务消息由生产者生产后放入Broker,并由生产者确定是否真正的提交消息供消费者消费
其设计思路就是利用的二阶段提交来确定的
应用程序首先发送一个为prepare的消息,然后消息发送成功之后
RMQ先将其放入到主题为RMQ_SYS_TRANS_HALF_TOPIC的消息消费主题中
RMQ开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列的消息,向消息发送端发起消息反查,应用程序的反查接口返回事务状态是回滚还是提交
那么我们来看下事务相关流程代码
事务消息在消息生产者端是由TransactionMQProducer实现的
其内部主要持有了两个重要的对象
private ExecutorService executorService;
private TransactionListener transactionListener; |
上面两个对象
transactionListener是事务监听器,实现本地事务的状态变更和事务回查两个主要操作
ExecutorService是事务状态回查异步执行线程池
在TransactionListener中,我们主要暴露的接口有
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
LocalTransactionState checkLocalTransaction(final MessageExt msg); |
上面的函数分别是
执行本地事务
事务消息状态回查
说完了基本的属性,我们集中看代码
入口位于生产者中的sendMessageInTransaction
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException(“TransactionListener is null”, null); } msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); } |
只是简单的校验了一下是不是存在监听者,不存在则直接抛出异常
接下来是发送的流程
位于DefaultMQProducerImpl中的sendMessageInTransaction
//再次校验监听者
TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException(“tranExecutor is null”, null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; //设置消息属性,标记为事务消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, “true”); //设置生产组 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { //同步发送到RMQ sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException(“send message Exception”, e); } |
此函数接下来根据发送的结果进行一些后置操作
//判断发送结果
switch (sendResult.getSendStatus()) { //如果发送成功了 case SEND_OK: { try { //存属性 if (sendResult.getTransactionId() != null) { msg.putUserProperty(“__transactionId__”, sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !””.equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug(“Used new transaction API”); //执行监听者的executeLocalTransaction,因为后续有一个回查的操作, // 所以需要在本地有一个唯一标识,为了方便回查 localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { //没有存储成功? localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info(“executeLocalTransactionBranch return {}”, localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info(“executeLocalTransactionBranch exception”, e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: //失败了,则设置为消息为回滚 localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; |
根据成功和失败,标记为不同的事务状态
try {
//根据状态,来结束事务 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn(“local transaction execute ” + localTransactionState + “, but end broker transaction failed”, e); } |
最后,根据状态来结束事务
常见的状态有:
COMMIT_MESSAGE,
ROLLBACK_MESSAGE, UNKNOW |
分别代表,回滚,提交,未知
在endTranscation中并不会提交事务,而是记录消息状态,方便消息回查
故消息发送的流程到此就结束了
接下来,我们需要看Broker端,对于Perpare如何接纳的
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( “the broker[” + this.brokerController.getBrokerConfig().getBrokerIP1() + “] sending transaction message is forbidden”); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); |
对于消息,判断属性中是Prepared消息,如果成立,那么走prepareMessage()
一直走到putHalfMessage函数中,对Message进行重新封装
//事务消息的封装
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { //设置真实Topic MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); //设置真实QueueId MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); //设置Topic msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; } |
上述代码可以看出,消息Topic会被备份,然后设置Topic为RMQ_SYS_TRANS_HALF_TOPIC
接下来存入commitLog中,因为Topic已经被改变了,所以事务不会被消费者消费
在此外RMQ采用定时的方式消费这个主题
接下来的流程就是事务是如何进行提交和回滚的?
首先生产者主动提交事务
位于DeafultMQProducerImpl中的endTransaction
public void endTransaction(
final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); //获取到Broker地址 final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); //根据本地的事务状态 switch (localTransactionState) { case COMMIT_MESSAGE: //设置requestHeader中是否提交 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; |
其发送完成,对应Broker的Netty中注册的是EndTransactionProcessor
其中的processRequest为
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); LOGGER.debug(“Transaction request:{}”, requestHeader); if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) { response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); LOGGER.warn(“Message store is slave mode, so end transaction is forbidden. “); return response; } //根据解析出的Header中的TransactionCheck状态 if (requestHeader.getFromTransactionCheck()) { switch (requestHeader.getCommitOrRollback()) { //如果是UNKONW类型,则return case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn(“Check producer[{}] transaction state, but it’s pending status.” + “RequestHeader: {} Remark: {}”, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); return null; } case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { LOGGER.warn(“Check producer[{}] transaction state, the producer commit the message.” + “RequestHeader: {} Remark: {}”, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn(“Check producer[{}] transaction state, the producer rollback the message.” + “RequestHeader: {} Remark: {}”, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } default: return null; } } else { switch (requestHeader.getCommitOrRollback()) { case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn(“The producer[{}] end transaction in sending message, and it’s pending status.” + “RequestHeader: {} Remark: {}”, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); return null; } case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { break; } case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn(“The producer[{}] end transaction in sending message, rollback the message.” + “RequestHeader: {} Remark: {}”, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } default: return null; } } //走到这一步,要么是commit 要么rollback OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { //提交事务,首先获取到消息 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { //恢复原本消息 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); //存入commitLog RemotingCommand sendResult = sendFinalMessage(msgInner); //如果成功了 if (sendResult.getCode() == ResponseCode.SUCCESS) { //移除消息,将其存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { //也是找到消息 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { //但是这里只移除了消息,不存入commitLog this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } } response.setCode(result.getResponseCode()); response.setRemark(result.getResponseRemark()); return response; } |
总结一下对应的流程
就是获取到之前存储的消息
然后根据提交还是回滚,提交就存入一条恢复后的新的数据到commitLog,回滚则不操作commitLog
然后两者都存入到一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC中去,表示处理完成
然后是事务消息回查事务状态
在RMQ中事务消息会通过TransactionalMessaeCheckService线程定期的去检测RMQ_SYS_TRANS_HALF_TOPIC中的消息,来检测是否消息的状态,从而确定是回滚还是提交
检测的频率为1分钟一次,可以利用配置文件配置
@Override
protected void onWaitEnd() { //在实际的等待代码中 long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); //获取到了事务超时时间和事务回查的最大次数 long begin = System.currentTimeMillis(); log.info(“Begin to check prepare message, begin time:{}”, begin); //开始检查 this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info(“End to check prepare message, consumed time:{}”, System.currentTimeMillis() – begin); } |
在上面的等待代码中
我们获取到了超时时间和最大的事务回查次数
然后进行检查了,检查的逻辑如下
TransactionalMessageService#check()
//获取topic
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; //首先获取到MessageQueue Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn(“The queue of topic is empty :” + topic); return; } |
根据topic获取到旗下的多个MessageQueue
for (MessageQueue messageQueue : msgQueues) {
//遍历MessageQuue long startTime = System.currentTimeMillis(); //别忘了,我们的事务处理有两个队列,一个是尚未处理的,一个是处理过得 //下面就是通过未处理的获取处理过的队列 MessageQueue opQueue = getOpQueue(messageQueue); long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info(“Before check, the queue={} msgOffset={} opOffset={}”, messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) { log.error(“MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue”, messageQueue, halfOffset, opOffset); continue; } |
根据消息存储的messageQueue获取到处理过的队列
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>(); //根据当前的处理进度依次的从已经处理的队列拉取32条消息,然后判断是不是已经处理过 PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); if (null == pullResult) { log.error(“The queue={} check msgOffset={} with opOffset={} failed, pullResult is null”, messageQueue, halfOffset, opOffset); continue; } |
获取32条消息,从已经处理的队列中获取,方便比较
进行处理,处理前先声明几个局部变量
// single thread
//空消息的次数 int getMessageNullCount = 1; //处理HALF_TOPIC的最新进度 long newOffset = halfOffset; //当前队列的队列偏移量 long i = halfOffset; |
然后进行处理
//已经被处理过了
if (removeMap.containsKey(i)) { log.info(“Half offset {} has been committed/rolled back”, i); Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); |
如果已经被处理过了的话,直接移除,然后查询Offset增加
接下来遇到的就是没有处理的,需要我们校验是不是达到了check的地步
//接下来判断时间问题
//当前时间减去消息存储时间 long valueOfCurrentMinusBorn = System.currentTimeMillis() – msgExt.getBornTimestamp(); //checkImmunityTime:事务检测时间->即prepare消息发出后,给生产者一定时间,让其自动提交,这段时间内,先不进行回查 //transactionTimeout: 事务消息超时时间-> 到了就需要发送回查消息的时间 long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { //还没达到了消息回查的时间,就不处理 //但是函数内部会重新存储 if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } |
上面的重新存储是因为,offset在不断的前进,我们没有办法回查,也不建议往回修改数据结构,所以都会重新放入一个commitLog一个msg
//获取到消息
List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) //是否需要check || (opMsg != null && (opMsg.get(opMsg.size() – 1).getBornTimestamp() – startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { //再次存入到commitLog中,因为offset是不断往前进的,不会返回去查找,所以有修改的都重新放进去 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } //发送具体的回查消息 listener.resolveHalfMsg(msgExt); |
判断完成发送具体的回查消息
最后更新offset
//更新offset
if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); } |
具体的回查函数如下
在一个异步的函数中执行
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error(“Send check message error!”, e); } } }); } |
发送请求给生产者端
public void sendCheckMessage(MessageExt msgExt) throws Exception {
//组装Header,最主要的设置为transactionId CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); if (channel != null) { //通过通道发送回去 brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { LOGGER.warn(“Check transaction failed, channel is null. groupId={}”, groupId); } } |