本次主要介绍的流程有
1.消息批量发送
2.消息发送队列自选择
3.消息过滤
4.事务消息
5.Spring Cloud整合RocketMQ
1.消息批量发送
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(“BatchProducerGroupName”); producer.start(); //If you just send messages of no more than 1MiB at a time, it is easy to use batch //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support String topic = “BatchTest”; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, “Tag”, “OrderID001”, “Hello world 0”.getBytes())); messages.add(new Message(topic, “Tag”, “OrderID002”, “Hello world 1”.getBytes())); messages.add(new Message(topic, “Tag”, “OrderID003”, “Hello world 2”.getBytes())); producer.send(messages); } |
直接new一个Producer,传入group
将多个Message add 进一个list中,发送出去
2.消息发送队列自选择
消息发送的时候,会根据主题的路由信息,进行负载均衡,默认轮询,如果需要订单信息稳定发送到某个消费者,我们需要使用消息队列选择器
MQProducer producer = new DefaultMQProducer(“please_rename_unique_group_name”);
producer.start(); String[] tags = new String[] {“TagA”, “TagB”, “TagC”, “TagD”, “TagE”}; for (int i = 0; i < 100; i++) { int orderId = i % 10; Message msg = new Message(“TopicTestjjj”, tags[i % tags.length], “KEY” + i, (“Hello RocketMQ ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf(“%s%n”, sendResult); } producer.shutdown(); |
上述代码中我们实现了一个消息队列选择器,进行消息的定向分配
3.消息过滤
因为现在主流的过滤方式就是表达式模式,我们将说下表达式模式中的TAG和SQL92模式
首先是Tag:
DefaultMQProducer producer = new DefaultMQProducer(“BatchProducerGroupName”);
producer.start(); //send TAG message for (int i = 0; i < 10; i++) { if (i > 5) { Message message = new Message(“TopicFilter1”, “TAOPICA_TAG_ALL”, “order1”, “Hello”.getBytes(StandardCharsets.UTF_8)); producer.send(message); } else { Message message = new Message(“TopicFilter1”, “TAOPICA_TAG_ORD”, “order1”, “Hello”.getBytes(StandardCharsets.UTF_8)); producer.send(message); } } |
消息发送后,对应的消费者的Expression为
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“BatchProducerGroupName”);
consumer.subscribe(“TopicFilter1″,”TAOPICA_TAG_ALL || TAOPICA_TAG_ORD”); |
接下来是SQL的过滤
消息的发送为
Message message = new Message(“TopicFilter1”, “TAOPICA_TAG_ALL”, “order1”, “Hello”.getBytes(StandardCharsets.UTF_8));
message.putUserProperty(“orderStatus”,”1″); message.putUserProperty(“slaveId”,”2″); producer.send(message); |
对应的消费者的订阅模式为
DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer(“BatchProducerGroupName”);
consumer2.subscribe(“TopicFilter1”, MessageSelector.bySql(“slaveId is not null and slaveId = 2”)); |
4.事务消息
我们模拟订单的流转过程
首先是创建订单,然后是等待订单真正支付后发放给下游
对应到RMQ中的事务消息,
需要实现TransactionListener的接口
其中包含两个接口
executeLocalTransaction -> 设置事务的状态,在本地事务prepare消息发送后,提交一个,推荐返回LocalTranscationState.UNKNOW
checkLocalTransaction 告知RMQ消息是否提交还是回滚
5.SpringCloud整合RMQ
整合对应的MAVEN
加上部分配置
spring:
rocketmq:
namesrvadd: localhost:298476
producerGroup:TestProducer
counsumer:TestConsumer
即可使用