我们说一些关于上述学习流程中遇到的问题

1.网关如何接受服务端的秒杀结果

一般来说,我们如何接受后端的秒杀结果,并给APP返回响应的呢?

这个本质上和SpringMVC以及Spring Cloud gateway没区别,内部还引入了Netty

简单的实现版本如下

public class RequestHandler {

// ID生成器

@Inject

private IdGenerator idGenerator;

// 消息队列生产者

@Inject

private Producer producer;

// 保存秒杀结果的Map

@Inject

private Map<Long, Result> results;

// 保存mutex的Map

private Map<Long, Object> mutexes = new ConcurrentHashMap<>();

// 这个网关实例的ID

@Inject

private long myId;

@Inject

private long timeout;

// 在这里处理APP的秒杀请求

public Response onRequest(Request request) {

// 获取一个进程内唯一的UUID作为请求id

Long uuid = idGenerator.next();

try {

Message msg = composeMsg(request, uuid, myId);

// 生成一个mutex,用于等待和通知

Object mutex = new Object();

mutexes.put(uuid, mutex)

// 发消息

producer.send(msg);

// 等待后端处理

synchronized(mutex) {

mutex.wait(timeout);

}

// 查询秒杀结果

Result result = results.remove(uuid);

// 检查秒杀结果并返回响应

if(null != result && result.success()){

return Response.success();

}

} catch (Throwable ignored) {}

finally {

mutexes.remove(uuid);

}

// 返回秒杀失败

return Response.fail();

}

// 在这里处理后端服务返回的秒杀结果

public void onResult(Result result) {

Object mutex = mutexes.get(result.uuid());

if(null != mutex) { // 如果查询不到,说明已经超时了,丢弃result即可。

// 登记秒杀结果

results.put(result.uuid(), result);

// 唤醒处理APP请求的线程

synchronized(mutex) {

mutex.notify();

}

}

}

}

本质上,就是去发送消息,处理并返回

我们可以选择使用RPC 的方法去处理并返回秒杀结果,RPC是发送方,后端服务是客户端,网关发送的消息中包含了网关的ID,后端可以通过网关ID来找到对应的网关实例

秒杀结果中需要包含请求ID,请求ID也是从消息中获取的

网关收到秒杀的响应结果,用请求ID为key,将结果保存到秒杀结果Map,中,去通知对应的处理APP请求的线程,结束等待

整个流程的处理如下

图片

2.RocketMQ和Kafka的消息模型

假设有一个主题Topic,我们为这个主题创建5个队列,分布到2个Broker中

图片

那么我们有三个生产者的实例 Produer0 Produer1 和 Produer2

这3个生产者如何应对2个Broker的?如何对应的5个队列?

生产者可以随便的发送到任意的一个Borker中,比如Producer0要发送5条消息,可以放到任意一个队列中,也可以每个队列各发送一次

对于消费相关

每个消费组就是一份订阅,主要是消费主体Topic下,所有队列的消息,消费组之间是互不影响的,比如我们有2个消费组 G0和G1 G0消费了哪些消息 G1是不知道的

然后是消费组的内部,一个消费组中可以包含多个消费者的实例,比如消费组G1 可以包含两个消费者C0和C1,

对应5个队列,在一个消费组中,只要保证每个队列对应一个消费者中,至于如何分配,可以hash,轮询等多种操作,只要能保证就行

而且队列只是针对消费组内部来说的,对于其他的消费组没有影响,比如队列Q2被消费组G1消费者占用了,对于消费组G2来说,完全没有影响,G2可以分配它的消费者来占用和消费队列Q2

最后就是消费位置,每个消费组内部内部维护了一组消费为止,每个队列一个消费位置,消费为止在服务端保存

和消费者没有关系,就是一个整数,记录一个消费组,队列消费到哪里了,位置之前的消息就算成功了

图片

3,如何实现单个队列的并行消费

在京东的JMQ中,实现思路如下

队列中有10条消息,对应的编号0-9,当前的消费位置是5,同时来了三个消费者拉消息,将5 6 7的消息同时分配给了三个消费者,每个人一条,然后等待响应,并先将消费位置更新为8,实现并行消费

在消费过程中,如果编号为6 7 的消息响应回来了,编号5的消息一致不回来,消费失败了,这时候我们只能将消息5复制一下,复制到一个特殊重试队列,然后继续消费

4.如何保证消息的严格顺序

在消息队列中,从主题的层面是无法保证顺序的,只有在队列层面才能能保证消息的严格顺序

所以如果业务要求严格顺序,就只能将消息队列配置为1,生产者和消费者只能是一个实例,保证全局严格有序

不过一般都是只需要保证局部有序即可,对于局部有序,我们可以在发送端,使用账户ID作为Key,采用一致性哈希算法计算出队列编号,然后总是发送到同一个队列上,保证相同的Key的消息是严格有序的,如果不考虑队列扩容,可以使用队列的数量取模的方法来计算队列编号

发表评论

邮箱地址不会被公开。 必填项已用*标注