因为Kafka为了保证可用性,采用了副本机制,将所有的分区副本均匀的分配到所有的broker上,并从中挑选一个作为leader去对外提供服务,follower只负责同步备份数据。

但是并不能保证所有的follower都能完全跟上leader,肯定存在部分落后进度太多的follower,对于这些follower,是不能允许其参与leader竞选的。故引入了ISR的概念。

ISR就是Kafak动态维护的一组同步副本集合,每个分区都有自己的ISR列表,ISR中的副本都是可以跟上leader的。

而两者同步过程,是下面讲解的重点

对于leader-follower的同步,有一些重要概念,

起始位移 base offset 第一条消息的offset

高水位 high watermark 高水印值,保存了副本最新一条已经可以读取的消息的位移,对于超过HW值得消息,都将视为未提交成功的,consumer因此看不到。

日志末端位移 log end offset,LEO 副本中下一条待写入的offset,每当leader副本接收到producer端推送的消息的时候,就会更新自己的LEO

在上面过程中,和用户息息相关的值是HW,水位值。其前进更新的流程为,leader接收到了producer推送的消息,就会尝试更新LEO,在ISR副本都更新LEO后,producer上的HW值才会前移,表示更新成功。消费端才可以读取到HW线下的消息。

对于HW的更新详细细节我们会在接下来讲解HW和leader epoch的时候讲解,我们先看看ISR的具体设计。

ISR在0.9版本之前,提供了两个参数来控制ISR的同步,分别是replica.lag.max.messages和replica.lag.time.max.ms两个参数

一个是控制follower能够落后leader副本的消息数,一旦超过了消息数,就会被提出ISR。

一个是控制follower向leader请求消息的间隔,一旦超过了设定时间没有向leader发起Fetch请求。那么也会被踢出ISR

但实际上replica.lag.max.messages这个参数是具有一定问题的,比如我们设置这个值为4,意味着允许落后四个消息,而消息生产的消息的速度是2条/秒,一般情况是可以的,但是就怕在整个使用周期中,存在着峰值,比如一次消息就发送超过4个消息,那么follower就会被直接踢出ISR。但是这时候follower其实本身没问题,只是碍于配置项,无法加入ISR。

而且这个值是全局的,顾因此可能存在集群中多个topic,不同的消费速率,导致有的topic的ISR经常落后的问题。

于是在0.9版本之后,就去掉了replica.lag.max.messages参数,只用replica.lag.time.max.ms来检测落后副本的时间多少,如果follower副本落后leader的时间持续性地超过了这个参数值,那么这个副本才是不同步的。

上面我们提到了水印机制,我们会详细介绍水印的同步机制,以及其替代者 leader-epoch

水印在kafka中,是为了保证高可用的机制,通过水印,来保证用户读取到的数据都是高可用的。

而HW,是每个副本上都具有的一个属性

每一个副本上都具有两个属性,分别是LEO和HW,LEO是日志末端位移,记录这个副本中日志文件的下一条消息的位移值,比如有10条消息,对应的位移是0-9,那么LEO就是10,LEO指向的是下一条消息位置,也就是LEO所处位置,是没有消息的。

HW是另一个,每一个副本的HW都不大于LEO值,小于等于HW的值都会被认为是已提交的,或者已备份的。

对于LEO的更新,当leader副本收到producer的请求时候,就会自动更新自己的LEO值。

而Follower具有两个LEO值,一个是保存在follower所在broker上,一个放在leader的broker上,用于让leader记录follower获取到的消息位置,称为remote LEO。

对于remote LEO,是在leader收到follower的FETCH的时候,返回自己log中最新消息之前,更新的remote LEO,而follower本地的LEO,则是从leader收到消息后更新本地的LEO。

HW的更新机制,对于HW这个值,Leader和follower都是只具有一个,具体实际则发生在更新LEO之后,一旦follower向log写完数据,就会尝试更新本地的HW,取得是FETCH响应中的leader HW值和当前的LEO值进行min比较,取两者的小值来作为HW,来确保follower的HW不会超过leader的HW。

而leader的HW,其由于直接影响到分区数据的可见性,所以更新实际有以下四种

1.副本成为新的leader副本

2.broker出现崩溃从而踢出ISR时

3.producer向着leader写入消息

4.leader处理FETCH请求

上面的触发实际最常见的有2个,分别是producer的写入和leader处理Fetch,而在这两个触发条件达到了之后,其会选择所有满足条件的副本,包括ISR中,以及满足落后时间不大于replica.lag.time.max.ms参数的副本。确定所有满足条件的副本的LEO,取其中最小的LEO作为HW值。

更为具体的状态为,假设我们有一个topic,一个分区,两个副本。

初始情况下,leader和follower的HW和LEO都是0,不过leader的remote LEO是leader段保存的follower LEO,这时候也是0,

然后leader收到一条消息,leader写入消息之后,收到了follower发送的FETCH请求。

这时候LEO为1,然后leader的处理逻辑为更新remote LEO,然后更新分区的HW,不过这时候remote LEO为0,因为follower发来的FETCH请求中的fetch offset说明了尚未存入。

更新分区的HW,因为remote LEO为0,所以和leader LEO=1进行了min的对比后,确定HW为0

之后follower收到了FETCH的response之后,会写入本地log同时更新follower LEO,然后更新follower的HW,比较本地LEO和当前leader HW后取较小值,不过这时候leader HW为0,所以follower HW也只能为0

这时候第一次FETCH结束,follower会马不停蹄的开启第二次FETCH操作。这时候如果没有消息进入,leader会在等待一段时间后,返回response,不过在返回的时候,更新remote LEO为1,因为request中携带了fetch offset为1,然后更新HW,这时候leader LEO和follower LEO都为1,所以分区HW值为1

这样就前移了HW,不过上述流程暴露了副本备份的缺陷,因为HW的更新往往需要另外一轮的FETCH请求才能完成。

所以可能存在一些问题,诸如备份数据丢失,备份数据不一致。

对于数据丢失,因为HW更新在下一次RPC请求完成的,如果A和B都写入了数据之后,但还没有到下一次FETCH返回,这时候HW没有更新,如果A作为leader崩溃了,那么B会回滚offset,将新来的数据删除,这时候B再次成为了broker,即使A再次加入ISR,也会日志截断,将HW调整为1,这样上一条消息就会永久截断,永远丢失。

由于多次RPC可能带来的问题,Kafka引入了leader epoch来解决了这类问题。

其是一对值 epoch,offset。Epoch是leader的版本号,每次leader变更了,就会加一。而offset就是对应版本的消息位移,比如两个epoch值,(0,0)和(1,120),就是0版本写入了120条消息,而1版本从120开始写入。利用这样的方式,规避了诸如数据丢失的问题。

图片

依靠leader epoch来规避数据不一致的问题。

发表评论

邮箱地址不会被公开。 必填项已用*标注