对于消息队列来说,核心功能必然是收发消息,也就是消息的生产和消费两个流程,我们也就是要说,消息收发的两个流程
我们首先选择的是代码风格简单的RocketMQ来作为分析对象
首先看下客户端的代码,这一部分的代码,在代码中的rocketmq/client目录下
我们利用这个目录下的代码,看一下如何发送出消息的
这一步,我们首先看一下单元测试用例,因为一般所有的代码在上线提交之前,都是一个局部或者说一个小流程,可以利用其了解调用链路的流程
我们利用这个来看下有哪些API
首先是测试类的入口
DefaultMQProducerTest
测试方法主要如下
init
terminate testSendMessage_ZeroMessage testSendMessage_NoNameSrv testSendMessage_NoRoute testSendMessageSync_Success testSendMessageSync_WithBodyCompressed testSendMessageAsync_SuccesstestSendMessageAsync testSendMessageAsync_BodyCompressed testSendMessageSync_SuccessWithHook |
上面的代码中init和terminate是代码初始化和销毁的时候需要执行的代码,其他的则是测试发送的消息用例,从这个角度来看,可以利用init来学习Producer的启动流程
testSendMessageSync和testSendMessageAsync就是对应的同步发送和异步发送的流程
testSendMessageSync_WithBodyCompressed就是压缩消息发送的测试用例
Producer的入口类是
producer = new DefaultMQProducer(producerGroupTemp);
在这个DefaultMQProducer中,我们的类组合关系如下
在这中,我们利用了门面模式
让客户端提供了一个可以访问系统的接口,隐藏系统内部的复杂性
内部维护了DefaultMQProducerImpl,来将实际工作委托给他
这种方式也是门面类为了隐藏内部实现,对外提供服务的方式
然后我们init中
我们除了new 一个 Proudcer
我们还设置了对应的SrvAddr等
然后直接start()
那么这个start函数,对应的还有一个shutdown
在RocketMQ中,Producer是一个有状态的服务,我们在发送服务之前先启动Producer,然后进行操作
producer.start();
然后这个start方法的内部部实现
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn(“trace dispatcher start failed “, e);
}
}
}
直接调用了内部defaultMQProducerImpl的start
然后在Impl中的start
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException(“The producer group[” + this.defaultMQProducer.getProducerGroup()
+ “] has been created before, specify another name please.” + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info(“the producer [{}] start OK. sendMessageWithVIPChannel={}”, this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException(“The producer service state not OK, maybe started once, ”
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error(“scan RequestFutureTable exception”, e);
}
}
}, 1000 * 3, 1000);
}
然后Impl中的start,我们首先检查内部的一个字段serviceState
这个字段初始值是CREATE_JUST
我们开始也将这个字段设置了START_FAILED
利用这个字段,维护了一个状态模式,表示不同的状态下不同的操作
然后获取一个单例MQClientManager中的实例mQClientFactory
在这个MQ中注册自己
然后启动这个mQClientFactory
开启心跳发送
mqClientFactory对应的类MQClientInstance是MQ中的顶级类
每个客户端对应类MQClientInstance的一个实例,维护着客户端大部分的状态信息
然后是这个
MQClientInstance中start的代码
// 启动请求响应通道
this.mQClientAPIImpl.start(); // 启动各种定时任务 this.startScheduledTask(); // 启动拉消息服务 this.pullMessageService.start(); // 启动Rebalance服务 this.rebalanceService.start(); // 启动Producer服务 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); |
在mqClientAPIImpl中,我们分装了大量的API,用于客户端和Broker通信
启动各种定时任务
启动消息拉取服务
启动了Rebalance服务
启动了默认的Producer服务
上面中,我们涉及的类,主要有
DefaultMQProducetImpl:Producer的内部实现,大部分的Producer的业务逻辑,在这个类中
MQClientInstance:这个类封装了一些通用逻辑,用于和服务端交互
MQClientAPIImpl:客户端的RPC方法,实现了网络通信
NettyRemotingClient
然后是消息发送的流程
在Producer的接口MQProducer中,定义了多种不同的消息方法
可以分为
单向发送:发出去不关心响应
同步发送:等待发送后的结果
异步发送:发送后,主线程结束,等待回调
首先是异步发送的整体流程
在异步发送中,也是调用了对应的DefaultMQProducerImpl方法,做了一个委托的工作
在其中实际发送的代码如下
@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() – beginStartTime;
if (timeout > costTime) {
try {
try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
timeout – costTime);
} catch (MQBrokerException e) {
throw new MQClientException(“unknownn exception”, e);
}
} catch (Exception e) {
sendCallback.onException(e);
}
} else {
sendCallback.onException(new RemotingTooMuchRequestException(“call timeout”));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException(“exector rejected “, e);
}
}
在其中,也是调用了sendSelectImpl去执行发送的任务
用获取的线程进行了任务提交
让然后是sendSelectImpl()
这个方法中,利用selector获取到了对应的mq,然后调用了方法sendKernelImpl发送消息
至于对应的selector是由用户自己选择的,RocketMQ提供了多种策略模式,比如随机选择策略,哈希选择策略,同机房选择策略,如果需要,可以实现选择策略
然后是sendKernelImpl(),代码很多,基本就是构建消息的头和上下文SendMessageContext,调用方法MQClientAPIImpl.smedMessage(),消息发送给对应的Broker
消息就被发送到了远程调用的MQClientAPIImpl中,然后等待后续的序列化和网络传输等操作
底层逻辑中,也是利用一个方法参数,来区分是否是异步发送的,减少了代码的维护
那么本次的思考题,就是在源码的异步发送消息中,大部分的方法被开发者加上了@Deprecated注解,这是为何呢?
因为Netty本身就支持异步的写入消息,并注入Listener,这一步的发送,则是利用Nio的WorkGroup,这种情况下,显式的使用线程池异步的发送显得有点多余