本次主要介绍的流程有

1.消息批量发送

2.消息发送队列自选择

3.消息过滤

4.事务消息

5.Spring Cloud整合RocketMQ

1.消息批量发送

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer(“BatchProducerGroupName”);

producer.start();

//If you just send messages of no more than 1MiB at a time, it is easy to use batch

//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support

String topic = “BatchTest”;

List<Message> messages = new ArrayList<>();

messages.add(new Message(topic, “Tag”, “OrderID001”, “Hello world 0”.getBytes()));

messages.add(new Message(topic, “Tag”, “OrderID002”, “Hello world 1”.getBytes()));

messages.add(new Message(topic, “Tag”, “OrderID003”, “Hello world 2”.getBytes()));

producer.send(messages);

}

直接new一个Producer,传入group

将多个Message add 进一个list中,发送出去

2.消息发送队列自选择

消息发送的时候,会根据主题的路由信息,进行负载均衡,默认轮询,如果需要订单信息稳定发送到某个消费者,我们需要使用消息队列选择器

MQProducer producer = new DefaultMQProducer(“please_rename_unique_group_name”);

producer.start();

String[] tags = new String[] {“TagA”, “TagB”, “TagC”, “TagD”, “TagE”};

for (int i = 0; i < 100; i++) {

int orderId = i % 10;

Message msg =

new Message(“TopicTestjjj”, tags[i % tags.length], “KEY” + i,

(“Hello RocketMQ ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

Integer id = (Integer) arg;

int index = id % mqs.size();

return mqs.get(index);

}

}, orderId);

System.out.printf(“%s%n”, sendResult);

}

producer.shutdown();

上述代码中我们实现了一个消息队列选择器,进行消息的定向分配

3.消息过滤

因为现在主流的过滤方式就是表达式模式,我们将说下表达式模式中的TAG和SQL92模式

首先是Tag:

DefaultMQProducer producer = new DefaultMQProducer(“BatchProducerGroupName”);

producer.start();

//send TAG message

for (int i = 0; i < 10; i++) {

if (i > 5) {

Message message = new Message(“TopicFilter1”, “TAOPICA_TAG_ALL”, “order1”, “Hello”.getBytes(StandardCharsets.UTF_8));

producer.send(message);

} else {

Message message = new Message(“TopicFilter1”, “TAOPICA_TAG_ORD”, “order1”, “Hello”.getBytes(StandardCharsets.UTF_8));

producer.send(message);

}

}

消息发送后,对应的消费者的Expression为

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“BatchProducerGroupName”);

consumer.subscribe(“TopicFilter1″,”TAOPICA_TAG_ALL || TAOPICA_TAG_ORD”);

接下来是SQL的过滤

消息的发送为

Message message = new Message(“TopicFilter1”, “TAOPICA_TAG_ALL”, “order1”, “Hello”.getBytes(StandardCharsets.UTF_8));

message.putUserProperty(“orderStatus”,”1″);

message.putUserProperty(“slaveId”,”2″);

producer.send(message);

对应的消费者的订阅模式为

DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer(“BatchProducerGroupName”);

consumer2.subscribe(“TopicFilter1”, MessageSelector.bySql(“slaveId is not null and slaveId = 2”));

4.事务消息

我们模拟订单的流转过程

首先是创建订单,然后是等待订单真正支付后发放给下游

对应到RMQ中的事务消息,

需要实现TransactionListener的接口

其中包含两个接口

executeLocalTransaction -> 设置事务的状态,在本地事务prepare消息发送后,提交一个,推荐返回LocalTranscationState.UNKNOW

checkLocalTransaction 告知RMQ消息是否提交还是回滚

5.SpringCloud整合RMQ

整合对应的MAVEN

加上部分配置

spring:

rocketmq:

namesrvadd: localhost:298476

producerGroup:TestProducer

counsumer:TestConsumer

即可使用

发表评论

邮箱地址不会被公开。 必填项已用*标注