RMQ中的Netty

我们分别看RMQ中Server端和Client两端涉及到的Netty编程,希望借此总结出Netty的使用范式

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,

final ChannelEventListener channelEventListener) {

super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());

//引导类

this.serverBootstrap = new ServerBootstrap();

this.nettyServerConfig = nettyServerConfig;

this.channelEventListener = channelEventListener;

首先是NettyRemotingServer的创建

传入对应的ServerConfig

创建一个Netty的Server引导类

并且初始化BossEventGroup和SelectorEventGroup

如果可以使用Epoll,则初始化Epoll的EventGroup

//判断是否可以使用Epoll

if (useEpoll()) {

//使用Epoll的EvertLoopGroup做Boss

this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

return new Thread(r, String.format(“NettyEPOLLBoss_%d”, this.threadIndex.incrementAndGet()));

}

});

//使用Epoll的EvertLoopGroup做Selector

this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

//创建多少个事件处理线程

private int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override

public Thread newThread(Runnable r) {

return new Thread(r, String.format(“NettyServerEPOLLSelector_%d_%d”, threadTotal, this.threadIndex.incrementAndGet()));

}

});

不然就创建NioEventGroup

else {

this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

return new Thread(r, String.format(“NettyNIOBoss_%d”, this.threadIndex.incrementAndGet()));

}

});

this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

private int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override

public Thread newThread(Runnable r) {

return new Thread(r, String.format(“NettyServerNIOSelector_%d_%d”, threadTotal, this.threadIndex.incrementAndGet()));

}

});

}

接下来,在remotingServer中start函数的时候

构建了ServerBootStrap

ServerBootstrap childHandler =

//之前传入的两个EventGroup

this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)

//选择不同的通道

.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

//设置最大连接数

.option(ChannelOption.SO_BACKLOG, 1024)

//地址复用

.option(ChannelOption.SO_REUSEADDR, true)

//保持连接

.option(ChannelOption.SO_KEEPALIVE, false)

//开启Nagle算法

.childOption(ChannelOption.TCP_NODELAY, true)

//TCP的滑动窗口的两个Buf

.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())

.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())

//绑定本地地址

.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))

//创建一个Handler的构造器

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline()

//添加执行器和handler

.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)

.addLast(defaultEventExecutorGroup,

//编码器

encoder,

//解码器

new NettyDecoder(),

new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),

//连接管理器

connectionManageHandler,

//

serverHandler

);

}

});

//分配的ByteBuf

if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {

childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

}

然后启动对应的bootstrap

try {

//执行bind()函数,获取bind结果

ChannelFuture sync = this.serverBootstrap.bind().sync();

InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();

this.port = addr.getPort();

} catch (InterruptedException e1) {

throw new RuntimeException(“this.serverBootstrap.bind().sync() InterruptedException”, e1);

}

在整体流程中,我们需要额外关注的,是其中对于handler的装载

声明了一个ChannelInitializer,来进行组合多个Handler

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline()

//添加执行器和handler

.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)

.addLast(defaultEventExecutorGroup,

//编码器

encoder,

//解码器

new NettyDecoder(),

new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),

//连接管理器

connectionManageHandler,

//

serverHandler

);

}

主要看的就是编码及解码器和对应的处理函数

首先是Encoder

try {

ByteBuffer header = remotingCommand.encodeHeader();

out.writeBytes(header);

byte[] body = remotingCommand.getBody();

if (body != null) {

out.writeBytes(body);

}

} catch (Exception e) {

log.error(“encode exception, ” + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);

if (remotingCommand != null) {

log.error(remotingCommand.toString());

}

RemotingUtil.closeChannel(ctx.channel());

}

encodeHeader()的内部代码如下

public ByteBuffer encodeHeader() {

return encodeHeader(this.body != null ? this.body.length : 0);

}

其中,在其中encoderHeader中就是先写入了总长度,然后分别是是header的长度和header的length

Decoder则是一个类似的相反流程,简单的上一下代码

public static RemotingCommand decode(final ByteBuffer byteBuffer) {

int length = byteBuffer.limit();

int oriHeaderLen = byteBuffer.getInt();

int headerLength = getHeaderLength(oriHeaderLen);

byte[] headerData = new byte[headerLength];

byteBuffer.get(headerData);

RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

int bodyLength = length – 4 – headerLength;

byte[] bodyData = null;

if (bodyLength > 0) {

bodyData = new byte[bodyLength];

byteBuffer.get(bodyData);

}

cmd.body = bodyData;

return cmd;

}

然后是心跳连接处理类 IdleStateHandler,负责关闭连接

以及在Netty面临事件时的处理类 connectionMangeHandler

最后放入了核心处理类,serverHandler

其中的处理函数代码如下

@Override

protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

processMessageReceived(ctx, msg);

}

在函数中,根据Type来判断是请求还是响应

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

final RemotingCommand cmd = msg;

if (cmd != null) {

switch (cmd.getType()) {

case REQUEST_COMMAND:

processRequestCommand(ctx, cmd);

break;

case RESPONSE_COMMAND:

processResponseCommand(ctx, cmd);

break;

default:

break;

}

}

}

其中代码分为了同步和异步方式

在基础的同步处理过程中,代码执行如下

NettyRequestProcessor processor = pair.getObject1();

RemotingCommand response = processor.processRequest(ctx, cmd);

callback.callback(response);

交给了处理器去进行处理

而Processor何时加入到ProcessorTable中的呢?

借由BrokerController在initialize()函数中的registerProcessor()

将多个RequestCode及其对应的处理器注入进入了

发表评论

邮箱地址不会被公开。 必填项已用*标注