严格上来说一个RMQ的客户端,消息的提供者,在应用程序中初始化生产者的默认实现来进行消息的发送
那么必然从Default的生产者入手,来看一下生产者的启动流程
org.apache.rocketmq.client.producer.DefaultMQProducer
其实现了MQProducer,而MQProducer实现了MQAdmin
内部将实际的操作委托给了DefaultMQProducerImpl进行工作,基于委托而非继承
DefaultMQProducer实现MQAdmin的一些主要方法
负责Topic相关信息的交互和获取
常见的接口:
createTopic:创建主题
searchOffset 根据时间戳获取偏移量
maxOffset 最大物理偏移量
minOffset 最小物理偏移量
viewMessage 根据消息Id获取消息
queryMessage 根据条件查询消息
start和shutdown不必说
fetchPublishMessageQueues 查找这个主题下的所有消息队列
剩下的都是send函数
无非分为
同步 异步 单向
我们先行略过这些函数
看,在此类中常见的属性有哪些
private String producerGroup;
//默认topickey 用于请求queuedata private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; //默认主题在每一个Broker的队列数量 private volatile int defaultTopicQueueNums = 4; //默认超时时间 private int sendMsgTimeout = 3000; //压缩启用临界点 private int compressMsgBodyOverHowmuch = 1024 * 4; //同步发送时候重试次数 private int retryTimesWhenSendFailed = 2; //异步发送时候重试次数 private int retryTimesWhenSendAsyncFailed = 2; //重试的时候是否不等待存储结果就返回?不太可能为true的 private boolean retryAnotherBrokerWhenNotStoreOK = false; |
对应的启动流程,需要我们看start函数
@Override
public void start() throws MQClientException { this.setProducerGroup(withNamespace(this.producerGroup)); this.defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn(“trace dispatcher start failed “, e); } } } |
本质上基于的是defaultMQProducerImpl的start函数
那么impl的start如何看呢?
//初始状态
case CREATE_JUST: //先设置为创建失败,最后才设置为成功 this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { //如果不等于内部队列,设置instanceName为进程PID this.defaultMQProducer.changeInstanceNameToPID(); } //你中有我,我中有你,impl中有default,default委托给impl //利用单例保证一个JVM只有一个MQClientManager实例 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); |
然后,我们查看对应的MQClinetInstance实例函数
在对应的MQClientManager的getOrCreateInstance中调用
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//创建buildkey,根据IP和对应的配置名称 String clientId = clientConfig.buildMQClientId(); //如果不设置名称,则会将名称设置位进程ID,避免相同的clientId MQClientInstance instance = this.factoryTable.get(clientId); //此instance,是RocketMQ网络交互,消息生产者,消费者,Broker,NameServer打交道的通道 if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); //放入到这个实例中的一个ConcurrentMap中去,这个Manager实例,是一个static固定的实例/JVM中只有一个 MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn(“Returned Previous MQClientInstance for clientId:[{}]”, clientId); } else { log.info(“Created new MQClientInstance for clientId:[{}]”, clientId); } } return instance; } |
而其中的clientId的获取,利用了buildMQClientId这个函数
clientId是客户端IP + instance,但是如果instance不填,会走默认的string
那么会不会出现一个客户端多个clientId相同的问题
为了避免这个问题,RMQ会将instance设置为进程Id
MQClientInstance实例负责了RMQ的网络交互相关API
然后,我们回到start中
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//创建buildkey,根据IP和对应的配置名称 String clientId = clientConfig.buildMQClientId(); //如果不设置名称,则会将名称设置位进程ID,避免相同的clientId MQClientInstance instance = this.factoryTable.get(clientId); //此instance,是RocketMQ网络交互,消息生产者,消费者,Broker,NameServer打交道的通道 if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); //放入到这个实例中的一个ConcurrentMap中去,这个Manager实例,是一个static固定的实例/JVM中只有一个 MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn(“Returned Previous MQClientInstance for clientId:[{}]”, clientId); } else { log.info(“Created new MQClientInstance for clientId:[{}]”, clientId); } } return instance; } |
注册了本次的Instance
启动MQClientInstance,表示正式执行,主要涉及到了相关线程的启动,诸如心跳
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info(“the client factory [{}] start OK”, this.clientId); this.serviceState = ServiceState.RUNNING; break; |
接下来,我们看消息发送的前置流程