HA是基本上的中间件必备的能力
为了提高消息的高可用,避免单个Broker故障后引起的消息无法及时消费问题,RMQ必然有Broker主备,消息消费到达主服务器之后.首先需要同步消息到从服务器,如果主服务器出现了宕机,还可以从从节点上拉取消息
HA中涉及的类主要有
HAService: RMQ主从同步核心实现类
HAService$AcceptSocketService HA Master监听客户端链接实现类
HAService$GroupTransferService 主从同步通知实现类
HAService$HAClient HA Client端实现类
HAConnection HA Master服务端HA连接对象的封装读写实现类
HAConnection$ReadSocketService HA网络读实现类
HAConnection$WriteSocketService HA网络写实现类
从HAService开始,了解HA的工作机制,其实现原理如下
(1).主服务器启动,特定端口上监听从服务器的连接
(2).从服务器主动连接主服务器,主服务器接收客户端的连接,建立对应的TCP连接
(3).从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器
(4).从服务器保存消息并发送新的同步请求
整体脉络在start函数中
//开始函数
public void start() throws Exception { this.acceptSocketService.beginAccept(); this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); } |
首先是AcceptSocketService,作为HAServcie内部类,实现Master端监听Slave连接
其属性为:
//Broker服务监听套接字 (本地IP+端口号)
private final SocketAddress socketAddressListen; //服务端Socket通道,NIO的? private ServerSocketChannel serverSocketChannel; //事件选择器 private Selector selector; |
然后在start中调用的beginAccept()
public void beginAccept() throws Exception {
//创建一个Channel this.serverSocketChannel = ServerSocketChannel.open(); //创建事件选择器 this.selector = RemotingUtil.openSelector(); this.serverSocketChannel.socket().setReuseAddress(true); //设置监听端口 this.serverSocketChannel.socket().bind(this.socketAddressListen); //是否阻塞 this.serverSocketChannel.configureBlocking(false); //注册连接诶时间 this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } |
然后是调用了accpetSocketService的start,启动线程,我们就按照此看其实现的run函数
while (!this.isStopped()) {
try { //等待1000毫秒 this.selector.select(1000); Set<SelectionKey> selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { //获取到Channel通道 SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { HAService.log.info(“HAService receive new connection, ” + sc.socket().getRemoteSocketAddress()); try { //为每一个Channel创建一个HAConnection HAConnection conn = new HAConnection(HAService.this, sc); //启动 conn.start(); //添加到Connection HAService.this.addConnection(conn); } catch (Exception e) { log.error(“new HAConnection exception”, e); sc.close(); } } } else { log.warn(“Unexpected ops in select ” + k.readyOps()); } } selected.clear(); } } catch (Exception e) { log.error(this.getServiceName() + ” service has exception.”, e); } } |
上面accpetSocketService创建的HAConnection,则是负责M-S数据同步逻辑的,在其中,选择器每隔1s处理一次链接就绪的事件,并创建对应的HAConnection
然后是启动GroupTransferService,也是在HAService中的一个内部类
主要负责的是阻塞消息,因为消息生产者发送消息到master之后,刷写到磁盘中,就需要等待新数据传到从服务器,在从服务器返回了结果之后,GroupTransferService就会返回生产者
我们从其核心逻辑 doWaitTransfer 开始看起
private void doWaitTransfer() {
//锁定此Read队列,避免其他线程读取,还是使用Swap方式交换的 synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { //遍历请求 for (CommitLog.GroupCommitRequest req : this.requestsRead) { //获取到Slave的跟随偏移量是不是大于了提交的偏移量,如果大于了,就可以认为是同步OK boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); //当前时间 和 规定的刷新时间相加 long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now() + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(); while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) { //如果没有同步且没有超时 this.notifyTransferObject.waitForRunning(1000); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); } if (!transferOK) { log.warn(“transfer messsage to slave timeout, ” + req.getNextOffset()); } //设置返回的status req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } this.requestsRead.clear(); } } } |
整体代码如上,上面代码中
核心业务逻辑在于判断Slave的跟随偏移量是不是大于了此次生产者生成的偏移量
//读取到的从服务器的offset
public void notifyTransferSome(final long offset) { for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { //重设offset boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); //唤醒Object if (ok) { this.groupTransferService.notifyTransferSome(); break; } else { value = this.push2SlaveMaxOffset.get(); } } } |
利用Slave服务器返回的信息
然后在其中的notifyTransferSome子调用中,会唤醒doWaitTransfer线程
对于Slave端,则是使用的HAClient相关代码
也是位于HAService中的内部类
常见属性如下
//Socket的读缓存区的大小
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; //master的地址 private final AtomicReference<String> masterAddress = new AtomicReference<>(); //Slave的偏移量 private final ByteBuffer reportOffset = ByteBuffer.allocate(8); //网络传输通道 private SocketChannel socketChannel; //选择器 private Selector selector; //上一次的时间戳 private long lastWriteTimestamp = System.currentTimeMillis(); //commit文件的最大偏移量 private long currentReportedOffset = 0; //已经处理的读缓存区指针 private int dispatchPosition = 0; //读缓冲区 private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); //读缓冲区的备份 private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); |
@Override
public void run() { log.info(this.getServiceName() + ” service started”); //还没有被停止 while (!this.isStopped()) { try { //尝试链接Master if (this.connectMaster()) { //判断是不是需要反馈给Master当前的偏移量 if (this.isTimeToReportOffset()) { //尝试反馈 boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { //关闭Master this.closeMaster(); } } //选择器进行事件选择,执行间隔为1S this.selector.select(1000); //处理网路读请求 boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } if (!reportSlaveMaxOffsetPlus()) { continue; } long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() – this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn(“HAClient, housekeeping, found this connection[” + this.masterAddress + “] expired, ” + interval); this.closeMaster(); log.warn(“HAClient, master not response some time, so close connection”); } } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + ” service has exception. “, e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + ” service end”); } |
整体流程,就是链接Master -> 判断是不是需要反馈给Master当前偏移量 -> 事件选择 -> 处理网络反馈
那么我们依次看Run函数中具体的方法实现
首先是链接Master
private boolean connectMaster() throws ClosedChannelException {
//如果此类的连接通道为null if (null == socketChannel) { //获取到addr String addr = this.masterAddress.get(); if (addr != null) { //链接到远程 SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress != null) { this.socketChannel = RemotingUtil.connect(socketAddress); if (this.socketChannel != null) { this.socketChannel.register(this.selector, SelectionKey.OP_READ); } } } //获取到最大偏移量 this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); //当前时间 this.lastWriteTimestamp = System.currentTimeMillis(); } //是不是链接成功 return this.socketChannel != null; } |
利用RemotingUtil来链接到远程服务器
并且获取到本地最大偏移量,根据链接是不是成功来返回
获取到时间来判断是不是需要报告Master偏移量
private boolean isTimeToReportOffset() {
long interval = HAService.this.defaultMessageStore.getSystemClock().now() – this.lastWriteTimestamp; boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig() .getHaSendHeartbeatInterval(); return needHeart; } |
如果需要报告Master偏移量的话
//传入当前的最大偏移量
private boolean reportSlaveMaxOffset(final long maxOffset) { //分别设置了读和写 this.reportOffset.position(0); this.reportOffset.limit(8); this.reportOffset.putLong(maxOffset); this.reportOffset.position(0); this.reportOffset.limit(8); for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) { try { this.socketChannel.write(this.reportOffset); } catch (IOException e) { log.error(this.getServiceName() + “reportSlaveMaxOffset this.socketChannel.write exception”, e); return false; } } lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now(); //返回是不是已经没有读取数据了 return !this.reportOffset.hasRemaining(); } |
参数为本地的最大偏移量,返回后根据是不是还有可读取的来判断是否已经消费了
最后进行读取事件的处理
private boolean processReadEvent() {
int readSizeZeroTimes = 0; //只要还有剩余的未读取 while (this.byteBufferRead.hasRemaining()) { try { //读取数据到byteBufferRead int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; //进行分发到消息内部映射文件汇总 boolean result = this.dispatchReadRequest(); if (!result) { log.error(“HAClient, dispatchReadRequest error”); return false; } } else if (readSize == 0) { //有三次重试的机会 if (++readSizeZeroTimes >= 3) { break; } } else { log.info(“HAClient, processReadEvent read socket < 0”); return false; } } catch (IOException e) { log.info(“HAClient, processReadEvent read socket exception”, e); return false; } } return true; } |
在处理网络读请求的时候,从Master服务器传回的数据进行处理
上面过程中,就是读取到的字节数大于0,那么就利用dispatchReadRequest()进行分发
不然就循环3次之后,结束本次的读取,返回true
如果读取到的字节数小于0或者发生IO异常,就会返回false
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position(); while (true) { //读取可读取的大小 int diff = this.byteBufferRead.position() – this.dispatchPosition; if (diff >= msgHeaderSize) { //获取到Master的偏移量 long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition); int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8); long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0) { if (slavePhyOffset != masterPhyOffset) { log.error(“master pushed offset not equal the max phy offset in slave, SLAVE: ” + slavePhyOffset + ” MASTER: ” + masterPhyOffset); return false; } } if (diff >= (msgHeaderSize + bodySize)) { //可以读取data的长度达到了 byte[] bodyData = new byte[bodySize]; //读取 this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); this.byteBufferRead.get(bodyData); //追加数据 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); this.byteBufferRead.position(readSocketPos); this.dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false; } continue; } } if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true; } |
在其中读取并存到了commitLog 文件中
HAConnection实现原理
最后我们说一下主从连接,Master服务器收到从服务器的连接请求之后,会将SocketChannel封装为HAConnection对象,实现主服务器和从服务器的读写操作
//HAService的对象
private final HAService haService; //网络socket通道 private final SocketChannel socketChannel; //客户端地址 private final String clientAddr; //服务端向从服务器写数据服务类 private WriteSocketService writeSocketService; //服务端向从服务器读数据服务类 private ReadSocketService readSocketService; //从服务器请求拉取数据的偏移量 private volatile long slaveRequestOffset = -1; //从服务器反馈已经拉取完成的数据偏移量 private volatile long slaveAckOffset = -1; |
其中重要的便是 写数据服务类 和 读数据服务类
首先看下 readSocketService
是其中的内部类,继承自ServiceThread
内部属性包括
//网络缓冲区大小
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024; //选择器 private final Selector selector; //网络通道 private final SocketChannel socketChannel; //网络读写缓冲区 private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); //当前处理指针 private int processPosition = 0; //上次读取数据的时间戳 private volatile long lastReadTimestamp = System.currentTimeMillis(); |
在构造器中,只传入了socketChannel,函数如下
public ReadSocketService(final SocketChannel socketChannel) throws IOException {
this.selector = RemotingUtil.openSelector(); this.socketChannel = socketChannel; this.socketChannel.register(this.selector, SelectionKey.OP_READ); this.setDaemon(true); } |
然后在run函数中,核心在于不断的读取HA发来的读请求
while (!this.isStopped()) {
try { //选择器等待1s this.selector.select(1000); //处理读请求 boolean ok = this.processReadEvent(); if (!ok) { HAConnection.log.error(“processReadEvent error”); break; } long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() – this.lastReadTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn(“ha housekeeping, found this connection[” + HAConnection.this.clientAddr + “] expired, ” + interval); break; } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + ” service has exception.”, e); break; } } |
主要处理读请求的,在于processReadEvent()
private boolean processReadEvent() {
int readSizeZeroTimes = 0; //如果不具有剩余空间了 if (!this.byteBufferRead.hasRemaining()) { //设置为从头开始处理 this.byteBufferRead.flip(); this.processPosition = 0; } //只要还有剩余 while (this.byteBufferRead.hasRemaining()) { try { //读取数量 int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); //读取内容大于8 if ((this.byteBufferRead.position() – this.processPosition) >= 8) { //读取pos int pos = this.byteBufferRead.position() – (this.byteBufferRead.position() % 8); long readOffset = this.byteBufferRead.getLong(pos – 8); this.processPosition = pos; //设置本类的ack HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0) { HAConnection.this.slaveRequestOffset = readOffset; log.info(“slave[” + HAConnection.this.clientAddr + “] request offset ” + readOffset); } //通知唤醒某人 HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); } } else if (readSize == 0) { if (++readSizeZeroTimes >= 3) { break; } } else { log.error(“read socket[” + HAConnection.this.clientAddr + “] < 0”); return false; } } catch (IOException e) { log.error(“processReadEvent exception”, e); return false; } } return true; } } |
负责监听客户端上报的ACK,并通知等待的生产者线程,这就是此函数的主要作用
并且在之后,也有着如果读取到的字节数等于0,重复三次之后,就结束本次读取
然后是WriteSocketService
内部属性包含
//选择器
private final Selector selector; //网络Socket通道 private final SocketChannel socketChannel; //消息头长度 private final int headerSize = 8 + 4; private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize); //下一次传输的物理偏移量 private long nextTransferFromWhere = -1; //根据偏移量查找消息的结果 private SelectMappedBufferResult selectMappedBufferResult; //上一次传输是否完毕 private boolean lastWriteOver = true; //上次写入的时间戳 private long lastWriteTimestamp = System.currentTimeMillis(); |
然后是对应的run方法
//选择器选择1s
this.selector.select(1000); //如果slaveRequestOffset等于-1,说明Master没有收到从服务器的拉取请求,放弃本次事件处理,这也是为何上面的Read线程会更新 if (-1 == HAConnection.this.slaveRequestOffset) { Thread.sleep(10); continue; } //如果是初次传输 if (-1 == this.nextTransferFromWhere) { if (0 == HAConnection.this.slaveRequestOffset) { //获取偏移量 long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); masterOffset = masterOffset – (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() .getMappedFileSizeCommitLog()); if (masterOffset < 0) { masterOffset = 0; } //同时设置为下次拉取的偏移量 this.nextTransferFromWhere = masterOffset; } else { //不然就是返回的偏移量 this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; } log.info(“master transfer data from ” + this.nextTransferFromWhere + ” to slave[” + HAConnection.this.clientAddr + “], and slave request ” + HAConnection.this.slaveRequestOffset); } |
上面过程中会进行间隔1s的选择
如果是初次传输,则计算偏移量,设置是否从commitLog文件的最大偏移量开始传输
//上次传输是否完毕
if (this.lastWriteOver) { // long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() – this.lastWriteTimestamp; //完毕的话,判断时间间隔,方便发送心跳包 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() .getHaSendHeartbeatInterval()) { // Build Header this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(this.nextTransferFromWhere); this.byteBufferHeader.putInt(0); this.byteBufferHeader.flip(); this.lastWriteOver = this.transferData(); if (!this.lastWriteOver) continue; } |
最后发送消息
//根据拉取偏移量,查找可读的消息
SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); if (selectResult != null) { //如果匹配到了消息 int size = selectResult.getSize(); //判断是不是大于了一次拉取的最大上限 if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); } //上面的更新大小会导致客户端收到不完整的消息 long thisOffset = this.nextTransferFromWhere; //更新下次拉取偏移量 this.nextTransferFromWhere += size; selectResult.getByteBuffer().limit(size); this.selectMappedBufferResult = selectResult; // Build Header this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(thisOffset); this.byteBufferHeader.putInt(size); this.byteBufferHeader.flip(); this.lastWriteOver = this.transferData(); |
这就是整体的发送及接受流程