RMQ中的Netty
我们分别看RMQ中Server端和Client两端涉及到的Netty编程,希望借此总结出Netty的使用范式
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); //引导类 this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; |
首先是NettyRemotingServer的创建
传入对应的ServerConfig
创建一个Netty的Server引导类
并且初始化BossEventGroup和SelectorEventGroup
如果可以使用Epoll,则初始化Epoll的EventGroup
//判断是否可以使用Epoll
if (useEpoll()) { //使用Epoll的EvertLoopGroup做Boss this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format(“NettyEPOLLBoss_%d”, this.threadIndex.incrementAndGet())); } }); //使用Epoll的EvertLoopGroup做Selector this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); //创建多少个事件处理线程 private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format(“NettyServerEPOLLSelector_%d_%d”, threadTotal, this.threadIndex.incrementAndGet())); } }); |
不然就创建NioEventGroup
else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format(“NettyNIOBoss_%d”, this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format(“NettyServerNIOSelector_%d_%d”, threadTotal, this.threadIndex.incrementAndGet())); } }); } |
接下来,在remotingServer中start函数的时候
构建了ServerBootStrap
ServerBootstrap childHandler =
//之前传入的两个EventGroup this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) //选择不同的通道 .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) //设置最大连接数 .option(ChannelOption.SO_BACKLOG, 1024) //地址复用 .option(ChannelOption.SO_REUSEADDR, true) //保持连接 .option(ChannelOption.SO_KEEPALIVE, false) //开启Nagle算法 .childOption(ChannelOption.TCP_NODELAY, true) //TCP的滑动窗口的两个Buf .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) //绑定本地地址 .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) //创建一个Handler的构造器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() //添加执行器和handler .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, //编码器 encoder, //解码器 new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //连接管理器 connectionManageHandler, // serverHandler ); } }); //分配的ByteBuf if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } |
然后启动对应的bootstrap
try {
//执行bind()函数,获取bind结果 ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException(“this.serverBootstrap.bind().sync() InterruptedException”, e1); } |
在整体流程中,我们需要额外关注的,是其中对于handler的装载
声明了一个ChannelInitializer,来进行组合多个Handler
@Override
public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() //添加执行器和handler .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, //编码器 encoder, //解码器 new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //连接管理器 connectionManageHandler, // serverHandler ); } |
主要看的就是编码及解码器和对应的处理函数
首先是Encoder
try {
ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error(“encode exception, ” + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } |
encodeHeader()的内部代码如下
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0); } |
其中,在其中encoderHeader中就是先写入了总长度,然后分别是是header的长度和header的length
Decoder则是一个类似的相反流程,简单的上一下代码
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length – 4 – headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; } |
然后是心跳连接处理类 IdleStateHandler,负责关闭连接
以及在Netty面临事件时的处理类 connectionMangeHandler
最后放入了核心处理类,serverHandler
其中的处理函数代码如下
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } |
在函数中,根据Type来判断是请求还是响应
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } |
其中代码分为了同步和异步方式
在基础的同步处理过程中,代码执行如下
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); |
交给了处理器去进行处理
而Processor何时加入到ProcessorTable中的呢?
借由BrokerController在initialize()函数中的registerProcessor()
将多个RequestCode及其对应的处理器注入进入了