在RocketMQ和Kafka中,事务消息是如何实现的
在RocketMQ中,我们代码可以如下的书写
在客户端创建事务的时候,需要我们实现对应的两个接口
一个是事务消息执行接口,一个是事务反查接口
public class CreateOrderService {
@Inject private OrderDao orderDao; // 注入订单表的DAO? @Inject private ExecutorService executorService; //注入一个ExecutorService private TransactionMQProducer producer; // 初始化transactionListener 和 producer @Init public void init() throws MQClientException { TransactionListener transactionListener = createTransactionListener(); producer = new TransactionMQProducer(“myGroup”); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); } // 创建订单服务的请求入口 @PUT @RequestMapping(…) public boolean createOrder(@RequestBody CreateOrderRequest request) { // 根据创建订单请求创建一条消息 Message msg = createMessage(request); // 发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, request); // 返回:事务是否成功 return sendResult.getSendStatus() == SendStatus.SEND_OK; } private TransactionListener createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { CreateOrderRequest request = (CreateOrderRequest ) arg; try { // 执行本地事务创建订单 orderDao.createOrderInDB(request); // 如果没抛异常说明执行成功,提交事务消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Throwable t) { // 失败则直接回滚事务消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } // 反查本地事务 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) {、 // 从消息中获得订单ID String orderId = msg.getUserProperty(“orderId”); // 去数据库中查询订单号是否存在,如果存在则提交事务; // 如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW //(PS:这里RocketMQ有个拼写错误:UNKNOW) return orderDao.isOrderIdExistsInDB(orderId)? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } }; } //…. } |
我们在这个流程中
设置了transactionListener和发生RocketMQ事务消息的变量Producer,真正创建订单服务的方法是createOrder(),其中调用producer发送消息
然后我们实现的TransactionListener接口则是实际调用的接口,内部实现的方法主要有
executeLocalTransaction:执行本地事务,数据插入数据库
checkLocalTransaction: 反查本地事务,查看订单号是否在数据库存在
这就是客户端的基本事务代码的实现流程
然后是RMQ源码中,生产者如何发送事务消息的实现方式
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException(“tranExecutor is null”, null); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 这里给消息添加了属性,标明这是一个事务消息,也就是半消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, “true”); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); // 调用发送普通消息的方法,发送这条半消息 try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException(“send message Exception”, e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty(“__transactionId__”, sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !””.equals(transactionId)) { msg.setTransactionId(transactionId); } // 执行本地事务 if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug(“Used new transaction API”); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info(“executeLocalTransactionBranch return {}”, localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info(“executeLocalTransactionBranch exception”, e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } // 根据事务消息和本地事务的执行结果localTransactionState,决定提交或回滚事务消息 // 这里给Broker发送提交或回滚事务的RPC请求。 try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn(“local transaction execute ” + localTransactionState + “, but end broker transaction failed”, e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; } |
上面的代码,首先给待发送的消息添加了一个属性,说明是半消息,然后发送出去,发送成功
然后调用我们提供的方法来执行本地事务,执行完成来决定是否提交或者回滚
然后是Broker这一端,是如何处理事务消息的
然后在处理Producer的请求的时候,会根据消息中的属性判断一下,是否是半消息
// …
if (traFlag != null && Boolean.parseBoolean(traFlag)) { // … putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } // … |
判断完成,去执行业务逻辑
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner)); } private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { // 记录消息的主题和队列,到新的属性中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); // 替换消息的主题和队列为:RMQ_SYS_TRANS_HALF_TOPIC,0 msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; } |
RMQ的处理方式,就是讲原始的主题进行保存后,将消息保存在一个特殊的内部主题中,保证用不消费
然后RMQ中,定期的进行事务发查,在Broker的TransactionalMessageCheckService中启动了一个定时器,定时的从消息队列中查出所有的半消息,然后依次的调用反查的RPC
根据反查得到的结果,决定是提交还是回滚
然后是Kafka中
在Kafka中的事务一致性,是用于解决特定场景的问题的
这种事务的一致性要么消费者和生产者有特殊联系,要么消费者就是生产者
就是Flink进行运算任务,从主题A中统计分钟内的订单收入,然后结果保存在另一个主题中,这就是消费者就是生产者
那么我们看下流程,基本如下
Kafka为了实现这个所谓的事务,引入了协调者的概念
协调者并非一个独立的进程,而是Broker进程的一部分
RMQ中,也有一个特殊的用于记录事务日志的主题,事务日志的主题实现和普通的主题一样,负责记录开启事务这类信息,而且日志主题中包含了多个分区,可以并行的执行多个事务
Kafka的事务流程中,首先是生产者给协调者发一个请求开启事务,然后协调者记录下事务ID
2.生产者发送消息之前,给协调者说明发送给哪个主题和分区
3.生产者直接发送消息,Kafka保存在对应的分区中,但是有特殊标记,无法被消费
4.消息发送后,生产者给协调者发送消息,来控制提交还是回滚
5.协调者来给事务相关的分区,写入一条事务结束的消息,客户端读到这个消息,就直接放行给业务代码进行消费,协调者记录事务日志,标记事务完成
那么,上面的流程,也是二阶段提交,不过需要由生产者全程控制完成