我们说一些关于上述学习流程中遇到的问题
1.网关如何接受服务端的秒杀结果
一般来说,我们如何接受后端的秒杀结果,并给APP返回响应的呢?
这个本质上和SpringMVC以及Spring Cloud gateway没区别,内部还引入了Netty
简单的实现版本如下
public class RequestHandler {
// ID生成器 @Inject private IdGenerator idGenerator; // 消息队列生产者 @Inject private Producer producer; // 保存秒杀结果的Map @Inject private Map<Long, Result> results; // 保存mutex的Map private Map<Long, Object> mutexes = new ConcurrentHashMap<>(); // 这个网关实例的ID @Inject private long myId; @Inject private long timeout; // 在这里处理APP的秒杀请求 public Response onRequest(Request request) { // 获取一个进程内唯一的UUID作为请求id Long uuid = idGenerator.next(); try { Message msg = composeMsg(request, uuid, myId); // 生成一个mutex,用于等待和通知 Object mutex = new Object(); mutexes.put(uuid, mutex) // 发消息 producer.send(msg); // 等待后端处理 synchronized(mutex) { mutex.wait(timeout); } // 查询秒杀结果 Result result = results.remove(uuid); // 检查秒杀结果并返回响应 if(null != result && result.success()){ return Response.success(); } } catch (Throwable ignored) {} finally { mutexes.remove(uuid); } // 返回秒杀失败 return Response.fail(); } // 在这里处理后端服务返回的秒杀结果 public void onResult(Result result) { Object mutex = mutexes.get(result.uuid()); if(null != mutex) { // 如果查询不到,说明已经超时了,丢弃result即可。 // 登记秒杀结果 results.put(result.uuid(), result); // 唤醒处理APP请求的线程 synchronized(mutex) { mutex.notify(); } } } } |
本质上,就是去发送消息,处理并返回
我们可以选择使用RPC 的方法去处理并返回秒杀结果,RPC是发送方,后端服务是客户端,网关发送的消息中包含了网关的ID,后端可以通过网关ID来找到对应的网关实例
秒杀结果中需要包含请求ID,请求ID也是从消息中获取的
网关收到秒杀的响应结果,用请求ID为key,将结果保存到秒杀结果Map,中,去通知对应的处理APP请求的线程,结束等待
整个流程的处理如下
2.RocketMQ和Kafka的消息模型
假设有一个主题Topic,我们为这个主题创建5个队列,分布到2个Broker中
那么我们有三个生产者的实例 Produer0 Produer1 和 Produer2
这3个生产者如何应对2个Broker的?如何对应的5个队列?
生产者可以随便的发送到任意的一个Borker中,比如Producer0要发送5条消息,可以放到任意一个队列中,也可以每个队列各发送一次
对于消费相关
每个消费组就是一份订阅,主要是消费主体Topic下,所有队列的消息,消费组之间是互不影响的,比如我们有2个消费组 G0和G1 G0消费了哪些消息 G1是不知道的
然后是消费组的内部,一个消费组中可以包含多个消费者的实例,比如消费组G1 可以包含两个消费者C0和C1,
对应5个队列,在一个消费组中,只要保证每个队列对应一个消费者中,至于如何分配,可以hash,轮询等多种操作,只要能保证就行
而且队列只是针对消费组内部来说的,对于其他的消费组没有影响,比如队列Q2被消费组G1消费者占用了,对于消费组G2来说,完全没有影响,G2可以分配它的消费者来占用和消费队列Q2
最后就是消费位置,每个消费组内部内部维护了一组消费为止,每个队列一个消费位置,消费为止在服务端保存
和消费者没有关系,就是一个整数,记录一个消费组,队列消费到哪里了,位置之前的消息就算成功了
3,如何实现单个队列的并行消费
在京东的JMQ中,实现思路如下
队列中有10条消息,对应的编号0-9,当前的消费位置是5,同时来了三个消费者拉消息,将5 6 7的消息同时分配给了三个消费者,每个人一条,然后等待响应,并先将消费位置更新为8,实现并行消费
在消费过程中,如果编号为6 7 的消息响应回来了,编号5的消息一致不回来,消费失败了,这时候我们只能将消息5复制一下,复制到一个特殊重试队列,然后继续消费
4.如何保证消息的严格顺序
在消息队列中,从主题的层面是无法保证顺序的,只有在队列层面才能能保证消息的严格顺序
所以如果业务要求严格顺序,就只能将消息队列配置为1,生产者和消费者只能是一个实例,保证全局严格有序
不过一般都是只需要保证局部有序即可,对于局部有序,我们可以在发送端,使用账户ID作为Key,采用一致性哈希算法计算出队列编号,然后总是发送到同一个队列上,保证相同的Key的消息是严格有序的,如果不考虑队列扩容,可以使用队列的数量取模的方法来计算队列编号