对于消息队列来说,核心功能必然是收发消息,也就是消息的生产和消费两个流程,我们也就是要说,消息收发的两个流程

我们首先选择的是代码风格简单的RocketMQ来作为分析对象

首先看下客户端的代码,这一部分的代码,在代码中的rocketmq/client目录下

我们利用这个目录下的代码,看一下如何发送出消息的

这一步,我们首先看一下单元测试用例,因为一般所有的代码在上线提交之前,都是一个局部或者说一个小流程,可以利用其了解调用链路的流程

我们利用这个来看下有哪些API

首先是测试类的入口

DefaultMQProducerTest

测试方法主要如下

init

terminate

testSendMessage_ZeroMessage

testSendMessage_NoNameSrv

testSendMessage_NoRoute

testSendMessageSync_Success

testSendMessageSync_WithBodyCompressed

testSendMessageAsync_SuccesstestSendMessageAsync

testSendMessageAsync_BodyCompressed

testSendMessageSync_SuccessWithHook

上面的代码中init和terminate是代码初始化和销毁的时候需要执行的代码,其他的则是测试发送的消息用例,从这个角度来看,可以利用init来学习Producer的启动流程

testSendMessageSync和testSendMessageAsync就是对应的同步发送和异步发送的流程

testSendMessageSync_WithBodyCompressed就是压缩消息发送的测试用例

Producer的入口类是

producer = new DefaultMQProducer(producerGroupTemp);

在这个DefaultMQProducer中,我们的类组合关系如下

图片

在这中,我们利用了门面模式

让客户端提供了一个可以访问系统的接口,隐藏系统内部的复杂性

内部维护了DefaultMQProducerImpl,来将实际工作委托给他

这种方式也是门面类为了隐藏内部实现,对外提供服务的方式

然后我们init中

我们除了new 一个 Proudcer

我们还设置了对应的SrvAddr等

然后直接start()

那么这个start函数,对应的还有一个shutdown

在RocketMQ中,Producer是一个有状态的服务,我们在发送服务之前先启动Producer,然后进行操作

producer.start();

然后这个start方法的内部部实现

@Override

public void start() throws MQClientException {

this.setProducerGroup(withNamespace(this.producerGroup));

this.defaultMQProducerImpl.start();

if (null != traceDispatcher) {

try {

traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());

} catch (MQClientException e) {

log.warn(“trace dispatcher start failed “, e);

}

}

}

直接调用了内部defaultMQProducerImpl的start

然后在Impl中的start

public void start(final boolean startFactory) throws MQClientException {

switch (this.serviceState) {

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {

this.defaultMQProducer.changeInstanceNameToPID();

}

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

if (!registerOK) {

this.serviceState = ServiceState.CREATE_JUST;

throw new MQClientException(“The producer group[” + this.defaultMQProducer.getProducerGroup()

+ “] has been created before, specify another name please.” + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

null);

}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {

mQClientFactory.start();

}

log.info(“the producer [{}] start OK. sendMessageWithVIPChannel={}”, this.defaultMQProducer.getProducerGroup(),

this.defaultMQProducer.isSendMessageWithVIPChannel());

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

case START_FAILED:

case SHUTDOWN_ALREADY:

throw new MQClientException(“The producer service state not OK, maybe started once, ”

+ this.serviceState

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

null);

default:

break;

}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override

public void run() {

try {

RequestFutureTable.scanExpiredRequest();

} catch (Throwable e) {

log.error(“scan RequestFutureTable exception”, e);

}

}

}, 1000 * 3, 1000);

}

然后Impl中的start,我们首先检查内部的一个字段serviceState

这个字段初始值是CREATE_JUST

我们开始也将这个字段设置了START_FAILED

利用这个字段,维护了一个状态模式,表示不同的状态下不同的操作

然后获取一个单例MQClientManager中的实例mQClientFactory

在这个MQ中注册自己

然后启动这个mQClientFactory

开启心跳发送

mqClientFactory对应的类MQClientInstance是MQ中的顶级类

每个客户端对应类MQClientInstance的一个实例,维护着客户端大部分的状态信息

然后是这个

MQClientInstance中start的代码

// 启动请求响应通道

this.mQClientAPIImpl.start();

// 启动各种定时任务

this.startScheduledTask();

// 启动拉消息服务

this.pullMessageService.start();

// 启动Rebalance服务

this.rebalanceService.start();

// 启动Producer服务

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

在mqClientAPIImpl中,我们分装了大量的API,用于客户端和Broker通信

启动各种定时任务

启动消息拉取服务

启动了Rebalance服务

启动了默认的Producer服务

上面中,我们涉及的类,主要有

DefaultMQProducetImpl:Producer的内部实现,大部分的Producer的业务逻辑,在这个类中

MQClientInstance:这个类封装了一些通用逻辑,用于和服务端交互

MQClientAPIImpl:客户端的RPC方法,实现了网络通信

NettyRemotingClient

然后是消息发送的流程

在Producer的接口MQProducer中,定义了多种不同的消息方法

可以分为

单向发送:发出去不关心响应

同步发送:等待发送后的结果

异步发送:发送后,主线程结束,等待回调

首先是异步发送的整体流程

在异步发送中,也是调用了对应的DefaultMQProducerImpl方法,做了一个委托的工作

在其中实际发送的代码如下

@Deprecated

public void send(final Message msg, final MessageQueueSelector selector, final Object arg,

final SendCallback sendCallback, final long timeout)

throws MQClientException, RemotingException, InterruptedException {

final long beginStartTime = System.currentTimeMillis();

ExecutorService executor = this.getAsyncSenderExecutor();

try {

executor.submit(new Runnable() {

@Override

public void run() {

long costTime = System.currentTimeMillis() – beginStartTime;

if (timeout > costTime) {

try {

try {

sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,

timeout – costTime);

} catch (MQBrokerException e) {

throw new MQClientException(“unknownn exception”, e);

}

} catch (Exception e) {

sendCallback.onException(e);

}

} else {

sendCallback.onException(new RemotingTooMuchRequestException(“call timeout”));

}

}

});

} catch (RejectedExecutionException e) {

throw new MQClientException(“exector rejected “, e);

}

}

在其中,也是调用了sendSelectImpl去执行发送的任务

用获取的线程进行了任务提交

让然后是sendSelectImpl()

这个方法中,利用selector获取到了对应的mq,然后调用了方法sendKernelImpl发送消息

至于对应的selector是由用户自己选择的,RocketMQ提供了多种策略模式,比如随机选择策略,哈希选择策略,同机房选择策略,如果需要,可以实现选择策略

然后是sendKernelImpl(),代码很多,基本就是构建消息的头和上下文SendMessageContext,调用方法MQClientAPIImpl.smedMessage(),消息发送给对应的Broker

消息就被发送到了远程调用的MQClientAPIImpl中,然后等待后续的序列化和网络传输等操作

底层逻辑中,也是利用一个方法参数,来区分是否是异步发送的,减少了代码的维护

那么本次的思考题,就是在源码的异步发送消息中,大部分的方法被开发者加上了@Deprecated注解,这是为何呢?

因为Netty本身就支持异步的写入消息,并注入Listener,这一步的发送,则是利用Nio的WorkGroup,这种情况下,显式的使用线程池异步的发送显得有点多余

发表评论

邮箱地址不会被公开。 必填项已用*标注