在RocketMQ和Kafka中,事务消息是如何实现的

在RocketMQ中,我们代码可以如下的书写

在客户端创建事务的时候,需要我们实现对应的两个接口

一个是事务消息执行接口,一个是事务反查接口

public class CreateOrderService {

@Inject

private OrderDao orderDao; // 注入订单表的DAO?

@Inject

private ExecutorService executorService; //注入一个ExecutorService

private TransactionMQProducer producer;

// 初始化transactionListener 和 producer

@Init

public void init() throws MQClientException {

TransactionListener transactionListener = createTransactionListener();

producer = new TransactionMQProducer(“myGroup”);

producer.setExecutorService(executorService);

producer.setTransactionListener(transactionListener);

producer.start();

}

// 创建订单服务的请求入口

@PUT

@RequestMapping(…)

public boolean createOrder(@RequestBody CreateOrderRequest request) {

// 根据创建订单请求创建一条消息

Message msg = createMessage(request);

// 发送事务消息

SendResult sendResult = producer.sendMessageInTransaction(msg, request);

// 返回:事务是否成功

return sendResult.getSendStatus() == SendStatus.SEND_OK;

}

private TransactionListener createTransactionListener() {

return new TransactionListener() {

@Override

public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

CreateOrderRequest request = (CreateOrderRequest ) arg;

try {

// 执行本地事务创建订单

orderDao.createOrderInDB(request);

// 如果没抛异常说明执行成功,提交事务消息

return LocalTransactionState.COMMIT_MESSAGE;

} catch (Throwable t) {

// 失败则直接回滚事务消息

return LocalTransactionState.ROLLBACK_MESSAGE;

}

}

// 反查本地事务

@Override

public LocalTransactionState checkLocalTransaction(MessageExt msg) {、

// 从消息中获得订单ID

String orderId = msg.getUserProperty(“orderId”);

// 去数据库中查询订单号是否存在,如果存在则提交事务;

// 如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW

//(PS:这里RocketMQ有个拼写错误:UNKNOW)

return orderDao.isOrderIdExistsInDB(orderId)?

LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;

}

};

}

//….

}

我们在这个流程中

设置了transactionListener和发生RocketMQ事务消息的变量Producer,真正创建订单服务的方法是createOrder(),其中调用producer发送消息

然后我们实现的TransactionListener接口则是实际调用的接口,内部实现的方法主要有

executeLocalTransaction:执行本地事务,数据插入数据库

checkLocalTransaction: 反查本地事务,查看订单号是否在数据库存在

这就是客户端的基本事务代码的实现流程

然后是RMQ源码中,生产者如何发送事务消息的实现方式

public TransactionSendResult sendMessageInTransaction(final Message msg,

final LocalTransactionExecuter localTransactionExecuter, final Object arg)

throws MQClientException {

TransactionListener transactionListener = getCheckListener();

if (null == localTransactionExecuter && null == transactionListener) {

throw new MQClientException(“tranExecutor is null”, null);

}

Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;

// 这里给消息添加了属性,标明这是一个事务消息,也就是半消息

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, “true”);

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

// 调用发送普通消息的方法,发送这条半消息

try {

sendResult = this.send(msg);

} catch (Exception e) {

throw new MQClientException(“send message Exception”, e);

}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;

Throwable localException = null;

switch (sendResult.getSendStatus()) {

case SEND_OK: {

try {

if (sendResult.getTransactionId() != null) {

msg.putUserProperty(“__transactionId__”, sendResult.getTransactionId());

}

String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

if (null != transactionId && !””.equals(transactionId)) {

msg.setTransactionId(transactionId);

}

// 执行本地事务

if (null != localTransactionExecuter) {

localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);

} else if (transactionListener != null) {

log.debug(“Used new transaction API”);

localTransactionState = transactionListener.executeLocalTransaction(msg, arg);

}

if (null == localTransactionState) {

localTransactionState = LocalTransactionState.UNKNOW;

}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {

log.info(“executeLocalTransactionBranch return {}”, localTransactionState);

log.info(msg.toString());

}

} catch (Throwable e) {

log.info(“executeLocalTransactionBranch exception”, e);

log.info(msg.toString());

localException = e;

}

}

break;

case FLUSH_DISK_TIMEOUT:

case FLUSH_SLAVE_TIMEOUT:

case SLAVE_NOT_AVAILABLE:

localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;

break;

default:

break;

}

// 根据事务消息和本地事务的执行结果localTransactionState,决定提交或回滚事务消息

// 这里给Broker发送提交或回滚事务的RPC请求。

try {

this.endTransaction(sendResult, localTransactionState, localException);

} catch (Exception e) {

log.warn(“local transaction execute ” + localTransactionState + “, but end broker transaction failed”, e);

}

TransactionSendResult transactionSendResult = new TransactionSendResult();

transactionSendResult.setSendStatus(sendResult.getSendStatus());

transactionSendResult.setMessageQueue(sendResult.getMessageQueue());

transactionSendResult.setMsgId(sendResult.getMsgId());

transactionSendResult.setQueueOffset(sendResult.getQueueOffset());

transactionSendResult.setTransactionId(sendResult.getTransactionId());

transactionSendResult.setLocalTransactionState(localTransactionState);

return transactionSendResult;

}

上面的代码,首先给待发送的消息添加了一个属性,说明是半消息,然后发送出去,发送成功

然后调用我们提供的方法来执行本地事务,执行完成来决定是否提交或者回滚

然后是Broker这一端,是如何处理事务消息的

然后在处理Producer的请求的时候,会根据消息中的属性判断一下,是否是半消息

// …

if (traFlag != null && Boolean.parseBoolean(traFlag)) {

// …

putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);

} else {

putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

}

// …

判断完成,去执行业务逻辑

public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {

return store.putMessage(parseHalfMessageInner(messageInner));

}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {

// 记录消息的主题和队列,到新的属性中

MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());

MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,

String.valueOf(msgInner.getQueueId()));

msgInner.setSysFlag(

MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));

// 替换消息的主题和队列为:RMQ_SYS_TRANS_HALF_TOPIC,0

msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());

msgInner.setQueueId(0);

msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

return msgInner;

}

RMQ的处理方式,就是讲原始的主题进行保存后,将消息保存在一个特殊的内部主题中,保证用不消费

然后RMQ中,定期的进行事务发查,在Broker的TransactionalMessageCheckService中启动了一个定时器,定时的从消息队列中查出所有的半消息,然后依次的调用反查的RPC

根据反查得到的结果,决定是提交还是回滚

然后是Kafka中

在Kafka中的事务一致性,是用于解决特定场景的问题的

这种事务的一致性要么消费者和生产者有特殊联系,要么消费者就是生产者

就是Flink进行运算任务,从主题A中统计分钟内的订单收入,然后结果保存在另一个主题中,这就是消费者就是生产者

那么我们看下流程,基本如下

Kafka为了实现这个所谓的事务,引入了协调者的概念

协调者并非一个独立的进程,而是Broker进程的一部分

RMQ中,也有一个特殊的用于记录事务日志的主题,事务日志的主题实现和普通的主题一样,负责记录开启事务这类信息,而且日志主题中包含了多个分区,可以并行的执行多个事务

图片

Kafka的事务流程中,首先是生产者给协调者发一个请求开启事务,然后协调者记录下事务ID

2.生产者发送消息之前,给协调者说明发送给哪个主题和分区

3.生产者直接发送消息,Kafka保存在对应的分区中,但是有特殊标记,无法被消费

4.消息发送后,生产者给协调者发送消息,来控制提交还是回滚

5.协调者来给事务相关的分区,写入一条事务结束的消息,客户端读到这个消息,就直接放行给业务代码进行消费,协调者记录事务日志,标记事务完成

图片

那么,上面的流程,也是二阶段提交,不过需要由生产者全程控制完成

发表评论

邮箱地址不会被公开。 必填项已用*标注