本次我们说下Kafka中的ACK

ACK是指的,在Kafka Server收到请求的时候,因为Kafka下的一个Partition存在多个副本,于是需要考虑一条消息到了Server,是只保存在Replica中的Leader,还是需要同步到Follow

于是不同的保存策略

常见的ACK保存策略有

1. 保存

2. 多数Follower保存即可

3. 所有Follower保存即可

但是Kafka并没有直接使用任何保存策略,而是在其基础上,抽取出了ISR这个该你那

也就是In-Sync Replicas 保持同步的副本

也就是指的还能跟的上Leader同步节奏的副本节点,同时包含Leader自己

因为需要保证ACK的可靠性,也同时为了提供给用户一定的选择性,

Kafka提供了多种ACK应答

在KafkaProducer中可以配置,分别为0,1,all

设置为0,就是表示KafakProducer在客户端,只管消息发送,不管落没落盘,也就是不call消息是否发送成功

消息设置1,表示只要Partition leader收到消息后写入成功就算成功了

如果设置为all,表示Leader在收到消息后,需要将ISR所有的节点同步完成,才算是写入完成

需要注意,是需要和配置的最小ISR节点数有所关联的

说完了ACK的基本概念,就需要我们注意一个问题,那就是ISR内部的相关代码,如何明确ISR和OSR

以及ISR之间是如何同步的

首先ISR即一个Replicas,是由上层的Partition和Topic来决定的,

那么如何ISR的初始化,应该是在Topic初始化的时候进行的

Topic的创建可以自动的创建,由kafka的配置文件中的auto.create.topics.enable来控制,如果是true,那么在有一个尚未创建的topic被发送了消息时候,就会创建一个partition为1,replicas为1的topic,不过不建议开启自动创建topic,因为不方便管理

或者通过kafka的kafka.topics.sh脚本来创建,我们通过这个方式来创建Topic

bin/kafka-topics.sh –create –zookeeper 192.168.0.2:2181/kafka100 –topic topic-test1 –replication-factor 2 –partitions 4

对应在sh脚本中,只有一行代码

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand “$@”

其实就是运行kafka的admin.TopicCommand

这就是进入到了Kafka的源码中,由createTopic方法执行的

//note: 创建 topic

def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {

val topic = opts.options.valueOf(opts.topicOpt)

val configs = parseTopicConfigsToBeAdded(opts)

val ifNotExists = opts.options.has(opts.ifNotExistsOpt)

if (Topic.hasCollisionChars(topic))

println(“WARNING: Due to limitations in metric names, topics with a period (‘.’) or underscore (‘_’) could collide. To avoid issues it is best to use either, but not both.”)

try {

if (opts.options.has(opts.replicaAssignmentOpt)) {//note: 指定 replica 的分配,直接向 zk 更新即可

val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))

AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)

} else {//note: 未指定 replica 的分配,调用自动分配算法进行分配

CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)

val partitions = opts.options.valueOf(opts.partitionsOpt).intValue

val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue

val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled

else RackAwareMode.Enforced

AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)

}

println(“Created topic \”%s\”.”.format(topic))

} catch  {

case e: TopicExistsException => if (!ifNotExists) throw e

}

如果没有指定replica的分配,那么久往下走到

AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)

其次往下走就是partition和Replicas的均匀分配

val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList,partitions, replicationFactor)

这个分配需要考虑两个目标

尽可能将副本均匀分布在所有Broker

每个Replicas尽可能分配到不同的Broker上

assignReplicasToBrokers中为了实现上面的目标,首先是随机选择一个Broker,然后使用round-robin算法分配Partition的第一个副本,其次是对于其他的副本,逐步增加Broker.id来选择replica的分配

def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],

nPartitions: Int,

replicationFactor: Int,

fixedStartIndex: Int = -1,

startPartitionId: Int = -1): Map[Int, Seq[Int]] = {

if (nPartitions <= 0) // note: 要增加的 partition 数需要大于0

throw new InvalidPartitionsException(“number of partitions must be larger than 0”)

if (replicationFactor <= 0) //note: replicas 应该大于0

throw new InvalidReplicationFactorException(“replication factor must be larger than 0″)

if (replicationFactor > brokerMetadatas.size) //note: replicas 超过了 broker 数

throw new InvalidReplicationFactorException(s”replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}”)

if (brokerMetadatas.forall(_.rack.isEmpty))//note: 没有开启机架感知

assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,

startPartitionId)

else { //note: 机架感知的情况

if (brokerMetadatas.exists(_.rack.isEmpty)) //note: 并不是所有的机架都有机架感知

throw new AdminOperationException(“Not all brokers have rack information for replica rack aware assignment”)

assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,

startPartitionId)

}

}

对于,机架感知,这里不是我们的重点,我们先继续往下走,对于实际的Partition和Replicas的分配,代码如下

private def assignReplicasToBrokersRackUnaware(nPartitions: Int,

replicationFactor: Int,

brokerList: Seq[Int],

fixedStartIndex: Int,

startPartitionId: Int): Map[Int, Seq[Int]] = {

val ret = mutable.Map[Int, Seq[Int]]()

val brokerArray = brokerList.toArray

val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) //note: 随机选择一个Broker

var currentPartitionId = math.max(0, startPartitionId) //note: 开始增加的第一个 partition

var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

for (_ <- 0 until nPartitions) { //note: 对每个 partition 进行分配

if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))

nextReplicaShift += 1 //note: 防止 partition 过大时,其中某些 partition 的分配(leader、follower)完全一样

val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length //note: partition 的第一个 replica

val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))

for (j <- 0 until replicationFactor – 1) //note: 其他 replica 的分配

replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))

ret.put(currentPartitionId, replicaBuffer)

currentPartitionId += 1

}

ret

}

//note: 为 partition 设置完第一个 replica 后,其他 replica 分配的计算

private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {

val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers – 1)//note: 在 secondReplicaShift 的基础上增加一个 replicaIndex

(firstReplicaIndex + shift) % nBrokers

}

对于上面代码,我们可以详细的描述一下,比如下面有5个节点,一个topic要求有10个partition,而且有三个副本

假设上面的startIndex和nextReplicaShift都是0

partition 为0时,那第一副本在 (0+0)%5=0,第二个副本在 (0+(1+(0+0)%4)))%5=1,第三副本在 (0+(1+(0+1)%4)))%5=2;

partition 为2时,那第一副本在 (0+2)%5=2,第二个副本在 (2+(1+(0+0)%4)))%5=3,第三副本在 (2+(1+(0+1)%4)))%5=4;

partition 为5时,那第一副本在 (0+5)%5=0,第二个副本在 (0+(1+(1+0)%4)))%5=2,第三副本在 (0+(1+(1+1)%4)))%5=3(partition 数是 Broker 数一倍时,nextReplicaShift 值会增加1);

partition 为8时,那第一副本在 (0+8)%5=3,第二个副本在 (3+(1+(1+0)%4)))%5=0,第三副本在 (3+(1+(1+1)%4)))%5=1。

在下面我们将对应的分配后的partition和topic信息写入ZK上

分别将配置和分配策略

WriteTopicConfig 时候将对应的信息写入 /config/topic/TopicName

WriteTopicPartitionAssignment将对应数据写入/brokers/topic/TopicName

这样一个topic的创建完成就可以返回给用户

Kafka Controller会监听ZK,

然后进行Topic的实际创建,对应方法位于 Kafka Controller中

val newTopics = currentChildren — controllerContext.allTopics

首先获取到新增的topic

然后获取到新增Topic对应的分配策略

val addedPartitionReplicaAssignment =ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)

之后进行判断是否有新增的Topic

if (newTopics.nonEmpty)//note: 处理新建的 topic

controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)}

然后就可以处理新增的分区了

在KafkaController的onNewPartitionCreation() 方法中

def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {

info(“New partition creation callback for %s”.format(newPartitions.mkString(“,”)))

partitionStateMachine.handleStateChanges(newPartitions, NewPartition)

replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)

partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)

replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)

}

将这个newPartition进行了一个状态的改变

分别对应着四个状态

NonExistentPartition:这个 partition 不存在;

NewPartition:这个 partition 刚创建,有对应的 replicas,但还没有 leader 和 isr;

OnlinePartition:这个 partition 的 leader 已经选举出来了,处理正常的工作状态;

OfflinePartition:partition 的 leader 挂了。

上面的onNewPartitionCreation中,我们依次看下对应的函数

1.partitionStateMachine.handleStateChanges(newPartitions, NewPartition): 创建 Partition 对象,并将其状态置为 NewPartition 状态

2.replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica):创建 Replica 对象,并将其状态置为 NewReplica 状态;

3.partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector):将 partition 对象从 NewPartition 改为 OnlinePartition 状态;

4.ReplicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica):将Replica对象改为OnlineReplica状态

第一步创建一个Partition

case NewPartition =>

//note: 新建一个 partition

assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)

partitionState.put(topicAndPartition, NewPartition) //note: 缓存 partition 的状态

val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(“,”)

stateChangeLogger.trace(“Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s”

.format(controllerId, controller.epoch, topicAndPartition, currState, targetState, assignedReplicas))

然后为每个Partition创建replica对象,设置为NewReplica

case NewReplica =>

assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)  //note: 验证

// start replica as a follower to the current leader for its partition

val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)

leaderIsrAndControllerEpochOpt match {

case Some(leaderIsrAndControllerEpoch) =>

if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)//note: 这个状态的 Replica 不能作为 leader

throw new StateChangeFailedException(“Replica %d for partition %s cannot be moved to NewReplica”

.format(replicaId, topicAndPartition) + “state as it is being requested to become leader”)

//note: 向所有 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求

brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),

topic, partition, leaderIsrAndControllerEpoch,

replicaAssignment)

向所有的replicaId发送了请求

之后进行OnlinePartition

       case OnlinePartition =>

assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)

partitionState(topicAndPartition) match {

case NewPartition =>

// initialize leader and isr path for new partition

initializeLeaderAndIsrForPartition(topicAndPartition) //note: 为新建的 partition 初始化 leader 和 isr

case OfflinePartition =>

electLeaderForPartition(topic, partition, leaderSelector)

case OnlinePartition => // invoked when the leader needs to be re-elected

electLeaderForPartition(topic, partition, leaderSelector)

case _ => // should never come here since illegal previous states are checked above

}

初始化leader和isr,将leader和isr的信息发给zk

将replicas中第一个replica作为leader,所有replicajiaru isr

发送LeaderAndIsr请求给所有replica,发送UpdateMetadata给Broker

最后将Replica的状态从NewReplica更新OnlineReplica状态

这样topic创建完成

发表评论

邮箱地址不会被公开。 必填项已用*标注