严格上来说一个RMQ的客户端,消息的提供者,在应用程序中初始化生产者的默认实现来进行消息的发送

那么必然从Default的生产者入手,来看一下生产者的启动流程

org.apache.rocketmq.client.producer.DefaultMQProducer

其实现了MQProducer,而MQProducer实现了MQAdmin

内部将实际的操作委托给了DefaultMQProducerImpl进行工作,基于委托而非继承

DefaultMQProducer实现MQAdmin的一些主要方法

负责Topic相关信息的交互和获取

图片

常见的接口:

createTopic:创建主题

searchOffset 根据时间戳获取偏移量

maxOffset 最大物理偏移量

minOffset 最小物理偏移量

viewMessage 根据消息Id获取消息

queryMessage 根据条件查询消息

图片

start和shutdown不必说

fetchPublishMessageQueues 查找这个主题下的所有消息队列

剩下的都是send函数

无非分为

同步 异步 单向

我们先行略过这些函数

看,在此类中常见的属性有哪些

private String producerGroup;

//默认topickey 用于请求queuedata

private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;

//默认主题在每一个Broker的队列数量

private volatile int defaultTopicQueueNums = 4;

//默认超时时间

private int sendMsgTimeout = 3000;

//压缩启用临界点

private int compressMsgBodyOverHowmuch = 1024 * 4;

//同步发送时候重试次数

private int retryTimesWhenSendFailed = 2;

//异步发送时候重试次数

private int retryTimesWhenSendAsyncFailed = 2;

//重试的时候是否不等待存储结果就返回?不太可能为true的

private boolean retryAnotherBrokerWhenNotStoreOK = false;

对应的启动流程,需要我们看start函数

@Override

public void start() throws MQClientException {

this.setProducerGroup(withNamespace(this.producerGroup));

this.defaultMQProducerImpl.start();

if (null != traceDispatcher) {

try {

traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());

} catch (MQClientException e) {

log.warn(“trace dispatcher start failed “, e);

}

}

}

本质上基于的是defaultMQProducerImpl的start函数

那么impl的start如何看呢?

//初始状态

case CREATE_JUST:

//先设置为创建失败,最后才设置为成功

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {

//如果不等于内部队列,设置instanceName为进程PID

this.defaultMQProducer.changeInstanceNameToPID();

}

//你中有我,我中有你,impl中有default,default委托给impl

//利用单例保证一个JVM只有一个MQClientManager实例

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

然后,我们查看对应的MQClinetInstance实例函数

在对应的MQClientManager的getOrCreateInstance中调用

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {

//创建buildkey,根据IP和对应的配置名称

String clientId = clientConfig.buildMQClientId();

//如果不设置名称,则会将名称设置位进程ID,避免相同的clientId

MQClientInstance instance = this.factoryTable.get(clientId);

//此instance,是RocketMQ网络交互,消息生产者,消费者,Broker,NameServer打交道的通道

if (null == instance) {

instance =

new MQClientInstance(clientConfig.cloneClientConfig(),

this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);

//放入到这个实例中的一个ConcurrentMap中去,这个Manager实例,是一个static固定的实例/JVM中只有一个

MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);

if (prev != null) {

instance = prev;

log.warn(“Returned Previous MQClientInstance for clientId:[{}]”, clientId);

} else {

log.info(“Created new MQClientInstance for clientId:[{}]”, clientId);

}

}

return instance;

}

而其中的clientId的获取,利用了buildMQClientId这个函数

clientId是客户端IP + instance,但是如果instance不填,会走默认的string

那么会不会出现一个客户端多个clientId相同的问题

为了避免这个问题,RMQ会将instance设置为进程Id

MQClientInstance实例负责了RMQ的网络交互相关API

然后,我们回到start中

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {

//创建buildkey,根据IP和对应的配置名称

String clientId = clientConfig.buildMQClientId();

//如果不设置名称,则会将名称设置位进程ID,避免相同的clientId

MQClientInstance instance = this.factoryTable.get(clientId);

//此instance,是RocketMQ网络交互,消息生产者,消费者,Broker,NameServer打交道的通道

if (null == instance) {

instance =

new MQClientInstance(clientConfig.cloneClientConfig(),

this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);

//放入到这个实例中的一个ConcurrentMap中去,这个Manager实例,是一个static固定的实例/JVM中只有一个

MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);

if (prev != null) {

instance = prev;

log.warn(“Returned Previous MQClientInstance for clientId:[{}]”, clientId);

} else {

log.info(“Created new MQClientInstance for clientId:[{}]”, clientId);

}

}

return instance;

}

注册了本次的Instance

启动MQClientInstance,表示正式执行,主要涉及到了相关线程的启动,诸如心跳

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

// If not specified,looking address from name server

if (null == this.clientConfig.getNamesrvAddr()) {

this.mQClientAPIImpl.fetchNameServerAddr();

}

// Start request-response channel

this.mQClientAPIImpl.start();

// Start various schedule tasks

this.startScheduledTask();

// Start pull service

this.pullMessageService.start();

// Start rebalance service

this.rebalanceService.start();

// Start push service

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

log.info(“the client factory [{}] start OK”, this.clientId);

this.serviceState = ServiceState.RUNNING;

break;

接下来,我们看消息发送的前置流程

发表评论

邮箱地址不会被公开。 必填项已用*标注