RMQ中的Netty

我们已经说过了Server端,我们接下来就要说一下对应的Client

Client对应的类为NettyRemotingClient

其初始化的时候

public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {

this.remotingClient = new NettyRemotingClient(nettyClientConfig);

this.remotingClient.registerRPCHook(rpcHook);

}

其构造器也是创建了线程池,EventGroupWorker

然后在Start函数中

(1)构造了执行线程池

this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(

nettyClientConfig.getClientWorkerThreads(),

new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

return new Thread(r, “NettyClientWorkerThread_” + this.threadIndex.incrementAndGet());

}

});

(2) 组装了对应的引导类

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true)

.option(ChannelOption.SO_KEEPALIVE, false)

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())

.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())

.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())

.handler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

if (nettyClientConfig.isUseTLS()) {

if (null != sslContext) {

pipeline.addFirst(defaultEventExecutorGroup, “sslHandler”, sslContext.newHandler(ch.alloc()));

log.info(“Prepend SSL handler”);

} else {

log.warn(“Connections are insecure as SSLContext is null!”);

}

}

pipeline.addLast(

defaultEventExecutorGroup,

new NettyEncoder(),

new NettyDecoder(),

new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),

new NettyConnectManageHandler(),

new NettyClientHandler());

}

});

其中的相关Handler,包含相关的编码器 解码器 以及连接管理器

最后还包含着一个NettyClientHandler

在ClientHandler中,其也是利用着和ServerHandler类似的处理方式,通过不同的Code,来选择执行器

而其执行器,则是在MQClientAPIImpl中的构造器组合了起来

发表评论

邮箱地址不会被公开。 必填项已用*标注