HA是基本上的中间件必备的能力

为了提高消息的高可用,避免单个Broker故障后引起的消息无法及时消费问题,RMQ必然有Broker主备,消息消费到达主服务器之后.首先需要同步消息到从服务器,如果主服务器出现了宕机,还可以从从节点上拉取消息

HA中涉及的类主要有

HAService: RMQ主从同步核心实现类

HAService$AcceptSocketService HA Master监听客户端链接实现类

HAService$GroupTransferService 主从同步通知实现类

HAService$HAClient HA Client端实现类

HAConnection HA Master服务端HA连接对象的封装读写实现类

HAConnection$ReadSocketService HA网络读实现类

HAConnection$WriteSocketService HA网络写实现类

从HAService开始,了解HA的工作机制,其实现原理如下

(1).主服务器启动,特定端口上监听从服务器的连接

(2).从服务器主动连接主服务器,主服务器接收客户端的连接,建立对应的TCP连接

(3).从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器

(4).从服务器保存消息并发送新的同步请求

整体脉络在start函数中

//开始函数

public void start() throws Exception {

this.acceptSocketService.beginAccept();

this.acceptSocketService.start();

this.groupTransferService.start();

this.haClient.start();

}

首先是AcceptSocketService,作为HAServcie内部类,实现Master端监听Slave连接

其属性为:

//Broker服务监听套接字 (本地IP+端口号)

private final SocketAddress socketAddressListen;

//服务端Socket通道,NIO的?

private ServerSocketChannel serverSocketChannel;

//事件选择器

private Selector selector;

然后在start中调用的beginAccept()

public void beginAccept() throws Exception {

//创建一个Channel

this.serverSocketChannel = ServerSocketChannel.open();

//创建事件选择器

this.selector = RemotingUtil.openSelector();

this.serverSocketChannel.socket().setReuseAddress(true);

//设置监听端口

this.serverSocketChannel.socket().bind(this.socketAddressListen);

//是否阻塞

this.serverSocketChannel.configureBlocking(false);

//注册连接诶时间

this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);

}

然后是调用了accpetSocketService的start,启动线程,我们就按照此看其实现的run函数

while (!this.isStopped()) {

try {

//等待1000毫秒

this.selector.select(1000);

Set<SelectionKey> selected = this.selector.selectedKeys();

if (selected != null) {

for (SelectionKey k : selected) {

if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {

//获取到Channel通道

SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

if (sc != null) {

HAService.log.info(“HAService receive new connection, ”

+ sc.socket().getRemoteSocketAddress());

try {

//为每一个Channel创建一个HAConnection

HAConnection conn = new HAConnection(HAService.this, sc);

//启动

conn.start();

//添加到Connection

HAService.this.addConnection(conn);

} catch (Exception e) {

log.error(“new HAConnection exception”, e);

sc.close();

}

}

} else {

log.warn(“Unexpected ops in select ” + k.readyOps());

}

}

selected.clear();

}

} catch (Exception e) {

log.error(this.getServiceName() + ” service has exception.”, e);

}

}

上面accpetSocketService创建的HAConnection,则是负责M-S数据同步逻辑的,在其中,选择器每隔1s处理一次链接就绪的事件,并创建对应的HAConnection

然后是启动GroupTransferService,也是在HAService中的一个内部类

主要负责的是阻塞消息,因为消息生产者发送消息到master之后,刷写到磁盘中,就需要等待新数据传到从服务器,在从服务器返回了结果之后,GroupTransferService就会返回生产者

我们从其核心逻辑 doWaitTransfer 开始看起

private void doWaitTransfer() {

//锁定此Read队列,避免其他线程读取,还是使用Swap方式交换的

synchronized (this.requestsRead) {

if (!this.requestsRead.isEmpty()) {

//遍历请求

for (CommitLog.GroupCommitRequest req : this.requestsRead) {

//获取到Slave的跟随偏移量是不是大于了提交的偏移量,如果大于了,就可以认为是同步OK

boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();

//当前时间 和 规定的刷新时间相加

long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()

+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();

while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {

//如果没有同步且没有超时

this.notifyTransferObject.waitForRunning(1000);

transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();

}

if (!transferOK) {

log.warn(“transfer messsage to slave timeout, ” + req.getNextOffset());

}

//设置返回的status

req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);

}

this.requestsRead.clear();

}

}

}

整体代码如上,上面代码中

核心业务逻辑在于判断Slave的跟随偏移量是不是大于了此次生产者生成的偏移量

//读取到的从服务器的offset

public void notifyTransferSome(final long offset) {

for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {

//重设offset

boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);

//唤醒Object

if (ok) {

this.groupTransferService.notifyTransferSome();

break;

} else {

value = this.push2SlaveMaxOffset.get();

}

}

}

利用Slave服务器返回的信息

然后在其中的notifyTransferSome子调用中,会唤醒doWaitTransfer线程

对于Slave端,则是使用的HAClient相关代码

也是位于HAService中的内部类

常见属性如下

//Socket的读缓存区的大小

private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;

//master的地址

private final AtomicReference<String> masterAddress = new AtomicReference<>();

//Slave的偏移量

private final ByteBuffer reportOffset = ByteBuffer.allocate(8);

//网络传输通道

private SocketChannel socketChannel;

//选择器

private Selector selector;

//上一次的时间戳

private long lastWriteTimestamp = System.currentTimeMillis();

//commit文件的最大偏移量

private long currentReportedOffset = 0;

//已经处理的读缓存区指针

private int dispatchPosition = 0;

//读缓冲区

private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

//读缓冲区的备份

private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

@Override

public void run() {

log.info(this.getServiceName() + ” service started”);

//还没有被停止

while (!this.isStopped()) {

try {

//尝试链接Master

if (this.connectMaster()) {

//判断是不是需要反馈给Master当前的偏移量

if (this.isTimeToReportOffset()) {

//尝试反馈

boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);

if (!result) {

//关闭Master

this.closeMaster();

}

}

//选择器进行事件选择,执行间隔为1S

this.selector.select(1000);

//处理网路读请求

boolean ok = this.processReadEvent();

if (!ok) {

this.closeMaster();

}

if (!reportSlaveMaxOffsetPlus()) {

continue;

}

long interval =

HAService.this.getDefaultMessageStore().getSystemClock().now()

– this.lastWriteTimestamp;

if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()

.getHaHousekeepingInterval()) {

log.warn(“HAClient, housekeeping, found this connection[” + this.masterAddress

+ “] expired, ” + interval);

this.closeMaster();

log.warn(“HAClient, master not response some time, so close connection”);

}

} else {

this.waitForRunning(1000 * 5);

}

} catch (Exception e) {

log.warn(this.getServiceName() + ” service has exception. “, e);

this.waitForRunning(1000 * 5);

}

}

log.info(this.getServiceName() + ” service end”);

}

整体流程,就是链接Master -> 判断是不是需要反馈给Master当前偏移量 -> 事件选择 -> 处理网络反馈

那么我们依次看Run函数中具体的方法实现

首先是链接Master

private boolean connectMaster() throws ClosedChannelException {

//如果此类的连接通道为null

if (null == socketChannel) {

//获取到addr

String addr = this.masterAddress.get();

if (addr != null) {

//链接到远程

SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);

if (socketAddress != null) {

this.socketChannel = RemotingUtil.connect(socketAddress);

if (this.socketChannel != null) {

this.socketChannel.register(this.selector, SelectionKey.OP_READ);

}

}

}

//获取到最大偏移量

this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

//当前时间

this.lastWriteTimestamp = System.currentTimeMillis();

}

//是不是链接成功

return this.socketChannel != null;

}

利用RemotingUtil来链接到远程服务器

并且获取到本地最大偏移量,根据链接是不是成功来返回

获取到时间来判断是不是需要报告Master偏移量

private boolean isTimeToReportOffset() {

long interval =

HAService.this.defaultMessageStore.getSystemClock().now() – this.lastWriteTimestamp;

boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()

.getHaSendHeartbeatInterval();

return needHeart;

}

如果需要报告Master偏移量的话

//传入当前的最大偏移量

private boolean reportSlaveMaxOffset(final long maxOffset) {

//分别设置了读和写

this.reportOffset.position(0);

this.reportOffset.limit(8);

this.reportOffset.putLong(maxOffset);

this.reportOffset.position(0);

this.reportOffset.limit(8);

for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {

try {

this.socketChannel.write(this.reportOffset);

} catch (IOException e) {

log.error(this.getServiceName()

+ “reportSlaveMaxOffset this.socketChannel.write exception”, e);

return false;

}

}

lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();

//返回是不是已经没有读取数据了

return !this.reportOffset.hasRemaining();

}

参数为本地的最大偏移量,返回后根据是不是还有可读取的来判断是否已经消费了

最后进行读取事件的处理

private boolean processReadEvent() {

int readSizeZeroTimes = 0;

//只要还有剩余的未读取

while (this.byteBufferRead.hasRemaining()) {

try {

//读取数据到byteBufferRead

int readSize = this.socketChannel.read(this.byteBufferRead);

if (readSize > 0) {

readSizeZeroTimes = 0;

//进行分发到消息内部映射文件汇总

boolean result = this.dispatchReadRequest();

if (!result) {

log.error(“HAClient, dispatchReadRequest error”);

return false;

}

} else if (readSize == 0) {

//有三次重试的机会

if (++readSizeZeroTimes >= 3) {

break;

}

} else {

log.info(“HAClient, processReadEvent read socket < 0”);

return false;

}

} catch (IOException e) {

log.info(“HAClient, processReadEvent read socket exception”, e);

return false;

}

}

return true;

}

在处理网络读请求的时候,从Master服务器传回的数据进行处理

上面过程中,就是读取到的字节数大于0,那么就利用dispatchReadRequest()进行分发

不然就循环3次之后,结束本次的读取,返回true

如果读取到的字节数小于0或者发生IO异常,就会返回false

private boolean dispatchReadRequest() {

final int msgHeaderSize = 8 + 4; // phyoffset + size

int readSocketPos = this.byteBufferRead.position();

while (true) {

//读取可读取的大小

int diff = this.byteBufferRead.position() – this.dispatchPosition;

if (diff >= msgHeaderSize) {

//获取到Master的偏移量

long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);

int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

if (slavePhyOffset != 0) {

if (slavePhyOffset != masterPhyOffset) {

log.error(“master pushed offset not equal the max phy offset in slave, SLAVE: ”

+ slavePhyOffset + ” MASTER: ” + masterPhyOffset);

return false;

}

}

if (diff >= (msgHeaderSize + bodySize)) {

//可以读取data的长度达到了

byte[] bodyData = new byte[bodySize];

//读取

this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);

this.byteBufferRead.get(bodyData);

//追加数据

HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

this.byteBufferRead.position(readSocketPos);

this.dispatchPosition += msgHeaderSize + bodySize;

if (!reportSlaveMaxOffsetPlus()) {

return false;

}

continue;

}

}

if (!this.byteBufferRead.hasRemaining()) {

this.reallocateByteBuffer();

}

break;

}

return true;

}

在其中读取并存到了commitLog 文件中

HAConnection实现原理

最后我们说一下主从连接,Master服务器收到从服务器的连接请求之后,会将SocketChannel封装为HAConnection对象,实现主服务器和从服务器的读写操作

//HAService的对象

private final HAService haService;

//网络socket通道

private final SocketChannel socketChannel;

//客户端地址

private final String clientAddr;

//服务端向从服务器写数据服务类

private WriteSocketService writeSocketService;

//服务端向从服务器读数据服务类

private ReadSocketService readSocketService;

//从服务器请求拉取数据的偏移量

private volatile long slaveRequestOffset = -1;

//从服务器反馈已经拉取完成的数据偏移量

private volatile long slaveAckOffset = -1;

其中重要的便是 写数据服务类 和 读数据服务类

首先看下 readSocketService

是其中的内部类,继承自ServiceThread

内部属性包括

//网络缓冲区大小

private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;

//选择器

private final Selector selector;

//网络通道

private final SocketChannel socketChannel;

//网络读写缓冲区

private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

//当前处理指针

private int processPosition = 0;

//上次读取数据的时间戳

private volatile long lastReadTimestamp = System.currentTimeMillis();

在构造器中,只传入了socketChannel,函数如下

public ReadSocketService(final SocketChannel socketChannel) throws IOException {

this.selector = RemotingUtil.openSelector();

this.socketChannel = socketChannel;

this.socketChannel.register(this.selector, SelectionKey.OP_READ);

this.setDaemon(true);

}

然后在run函数中,核心在于不断的读取HA发来的读请求

while (!this.isStopped()) {

try {

//选择器等待1s

this.selector.select(1000);

//处理读请求

boolean ok = this.processReadEvent();

if (!ok) {

HAConnection.log.error(“processReadEvent error”);

break;

}

long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() – this.lastReadTimestamp;

if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {

log.warn(“ha housekeeping, found this connection[” + HAConnection.this.clientAddr + “] expired, ” + interval);

break;

}

} catch (Exception e) {

HAConnection.log.error(this.getServiceName() + ” service has exception.”, e);

break;

}

}

主要处理读请求的,在于processReadEvent()

private boolean processReadEvent() {

int readSizeZeroTimes = 0;

//如果不具有剩余空间了

if (!this.byteBufferRead.hasRemaining()) {

//设置为从头开始处理

this.byteBufferRead.flip();

this.processPosition = 0;

}

//只要还有剩余

while (this.byteBufferRead.hasRemaining()) {

try {

//读取数量

int readSize = this.socketChannel.read(this.byteBufferRead);

if (readSize > 0) {

readSizeZeroTimes = 0;

this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();

//读取内容大于8

if ((this.byteBufferRead.position() – this.processPosition) >= 8) {

//读取pos

int pos = this.byteBufferRead.position() – (this.byteBufferRead.position() % 8);

long readOffset = this.byteBufferRead.getLong(pos – 8);

this.processPosition = pos;

//设置本类的ack

HAConnection.this.slaveAckOffset = readOffset;

if (HAConnection.this.slaveRequestOffset < 0) {

HAConnection.this.slaveRequestOffset = readOffset;

log.info(“slave[” + HAConnection.this.clientAddr + “] request offset ” + readOffset);

}

//通知唤醒某人

HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);

}

} else if (readSize == 0) {

if (++readSizeZeroTimes >= 3) {

break;

}

} else {

log.error(“read socket[” + HAConnection.this.clientAddr + “] < 0”);

return false;

}

} catch (IOException e) {

log.error(“processReadEvent exception”, e);

return false;

}

}

return true;

}

}

负责监听客户端上报的ACK,并通知等待的生产者线程,这就是此函数的主要作用

并且在之后,也有着如果读取到的字节数等于0,重复三次之后,就结束本次读取

然后是WriteSocketService

内部属性包含

//选择器

private final Selector selector;

//网络Socket通道

private final SocketChannel socketChannel;

//消息头长度

private final int headerSize = 8 + 4;

private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);

//下一次传输的物理偏移量

private long nextTransferFromWhere = -1;

//根据偏移量查找消息的结果

private SelectMappedBufferResult selectMappedBufferResult;

//上一次传输是否完毕

private boolean lastWriteOver = true;

//上次写入的时间戳

private long lastWriteTimestamp = System.currentTimeMillis();

然后是对应的run方法

//选择器选择1s

this.selector.select(1000);

//如果slaveRequestOffset等于-1,说明Master没有收到从服务器的拉取请求,放弃本次事件处理,这也是为何上面的Read线程会更新

if (-1 == HAConnection.this.slaveRequestOffset) {

Thread.sleep(10);

continue;

}

//如果是初次传输

if (-1 == this.nextTransferFromWhere) {

if (0 == HAConnection.this.slaveRequestOffset) {

//获取偏移量

long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();

masterOffset =

masterOffset

– (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()

.getMappedFileSizeCommitLog());

if (masterOffset < 0) {

masterOffset = 0;

}

//同时设置为下次拉取的偏移量

this.nextTransferFromWhere = masterOffset;

} else {

//不然就是返回的偏移量

this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;

}

log.info(“master transfer data from ” + this.nextTransferFromWhere + ” to slave[” + HAConnection.this.clientAddr

+ “], and slave request ” + HAConnection.this.slaveRequestOffset);

}

上面过程中会进行间隔1s的选择

如果是初次传输,则计算偏移量,设置是否从commitLog文件的最大偏移量开始传输

//上次传输是否完毕

if (this.lastWriteOver) {

//

long interval =

HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() – this.lastWriteTimestamp;

//完毕的话,判断时间间隔,方便发送心跳包

if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()

.getHaSendHeartbeatInterval()) {

// Build Header

this.byteBufferHeader.position(0);

this.byteBufferHeader.limit(headerSize);

this.byteBufferHeader.putLong(this.nextTransferFromWhere);

this.byteBufferHeader.putInt(0);

this.byteBufferHeader.flip();

this.lastWriteOver = this.transferData();

if (!this.lastWriteOver)

continue;

}

最后发送消息

//根据拉取偏移量,查找可读的消息

SelectMappedBufferResult selectResult =

HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);

if (selectResult != null) {

//如果匹配到了消息

int size = selectResult.getSize();

//判断是不是大于了一次拉取的最大上限

if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {

size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();

}

//上面的更新大小会导致客户端收到不完整的消息

long thisOffset = this.nextTransferFromWhere;

//更新下次拉取偏移量

this.nextTransferFromWhere += size;

selectResult.getByteBuffer().limit(size);

this.selectMappedBufferResult = selectResult;

// Build Header

this.byteBufferHeader.position(0);

this.byteBufferHeader.limit(headerSize);

this.byteBufferHeader.putLong(thisOffset);

this.byteBufferHeader.putInt(size);

this.byteBufferHeader.flip();

this.lastWriteOver = this.transferData();

这就是整体的发送及接受流程

发表评论

邮箱地址不会被公开。 必填项已用*标注