RMQ中的Netty
我们已经说过了Server端,我们接下来就要说一下对应的Client
Client对应的类为NettyRemotingClient
其初始化的时候
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig); this.remotingClient.registerRPCHook(rpcHook); } |
其构造器也是创建了线程池,EventGroupWorker
然后在Start函数中
(1)构造了执行线程池
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, “NettyClientWorkerThread_” + this.threadIndex.incrementAndGet()); } }); |
(2) 组装了对应的引导类
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, “sslHandler”, sslContext.newHandler(ch.alloc())); log.info(“Prepend SSL handler”); } else { log.warn(“Connections are insecure as SSLContext is null!”); } } pipeline.addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } }); |
其中的相关Handler,包含相关的编码器 解码器 以及连接管理器
最后还包含着一个NettyClientHandler
在ClientHandler中,其也是利用着和ServerHandler类似的处理方式,通过不同的Code,来选择执行器
而其执行器,则是在MQClientAPIImpl中的构造器组合了起来