本次我们说下Kafka中的ACK
ACK是指的,在Kafka Server收到请求的时候,因为Kafka下的一个Partition存在多个副本,于是需要考虑一条消息到了Server,是只保存在Replica中的Leader,还是需要同步到Follow
于是不同的保存策略
常见的ACK保存策略有
1. 保存
2. 多数Follower保存即可
3. 所有Follower保存即可
但是Kafka并没有直接使用任何保存策略,而是在其基础上,抽取出了ISR这个该你那
也就是In-Sync Replicas 保持同步的副本
也就是指的还能跟的上Leader同步节奏的副本节点,同时包含Leader自己
因为需要保证ACK的可靠性,也同时为了提供给用户一定的选择性,
Kafka提供了多种ACK应答
在KafkaProducer中可以配置,分别为0,1,all
设置为0,就是表示KafakProducer在客户端,只管消息发送,不管落没落盘,也就是不call消息是否发送成功
消息设置1,表示只要Partition leader收到消息后写入成功就算成功了
如果设置为all,表示Leader在收到消息后,需要将ISR所有的节点同步完成,才算是写入完成
需要注意,是需要和配置的最小ISR节点数有所关联的
说完了ACK的基本概念,就需要我们注意一个问题,那就是ISR内部的相关代码,如何明确ISR和OSR
以及ISR之间是如何同步的
首先ISR即一个Replicas,是由上层的Partition和Topic来决定的,
那么如何ISR的初始化,应该是在Topic初始化的时候进行的
Topic的创建可以自动的创建,由kafka的配置文件中的auto.create.topics.enable来控制,如果是true,那么在有一个尚未创建的topic被发送了消息时候,就会创建一个partition为1,replicas为1的topic,不过不建议开启自动创建topic,因为不方便管理
或者通过kafka的kafka.topics.sh脚本来创建,我们通过这个方式来创建Topic
bin/kafka-topics.sh –create –zookeeper 192.168.0.2:2181/kafka100 –topic topic-test1 –replication-factor 2 –partitions 4
对应在sh脚本中,只有一行代码
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand “$@”
其实就是运行kafka的admin.TopicCommand
这就是进入到了Kafka的源码中,由createTopic方法执行的
//note: 创建 topic
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) { val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) val ifNotExists = opts.options.has(opts.ifNotExistsOpt) if (Topic.hasCollisionChars(topic)) println(“WARNING: Due to limitations in metric names, topics with a period (‘.’) or underscore (‘_’) could collide. To avoid issues it is best to use either, but not both.”) try { if (opts.options.has(opts.replicaAssignmentOpt)) {//note: 指定 replica 的分配,直接向 zk 更新即可 val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false) } else {//note: 未指定 replica 的分配,调用自动分配算法进行分配 CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled else RackAwareMode.Enforced AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode) } println(“Created topic \”%s\”.”.format(topic)) } catch { case e: TopicExistsException => if (!ifNotExists) throw e } |
如果没有指定replica的分配,那么久往下走到
AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
其次往下走就是partition和Replicas的均匀分配
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList,partitions, replicationFactor)
这个分配需要考虑两个目标
尽可能将副本均匀分布在所有Broker
每个Replicas尽可能分配到不同的Broker上
assignReplicasToBrokers中为了实现上面的目标,首先是随机选择一个Broker,然后使用round-robin算法分配Partition的第一个副本,其次是对于其他的副本,逐步增加Broker.id来选择replica的分配
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, startPartitionId: Int = -1): Map[Int, Seq[Int]] = { if (nPartitions <= 0) // note: 要增加的 partition 数需要大于0 throw new InvalidPartitionsException(“number of partitions must be larger than 0”) if (replicationFactor <= 0) //note: replicas 应该大于0 throw new InvalidReplicationFactorException(“replication factor must be larger than 0″) if (replicationFactor > brokerMetadatas.size) //note: replicas 超过了 broker 数 throw new InvalidReplicationFactorException(s”replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}”) if (brokerMetadatas.forall(_.rack.isEmpty))//note: 没有开启机架感知 assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex, startPartitionId) else { //note: 机架感知的情况 if (brokerMetadatas.exists(_.rack.isEmpty)) //note: 并不是所有的机架都有机架感知 throw new AdminOperationException(“Not all brokers have rack information for replica rack aware assignment”) assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId) } } |
对于,机架感知,这里不是我们的重点,我们先继续往下走,对于实际的Partition和Replicas的分配,代码如下
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int, brokerList: Seq[Int], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = { val ret = mutable.Map[Int, Seq[Int]]() val brokerArray = brokerList.toArray val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) //note: 随机选择一个Broker var currentPartitionId = math.max(0, startPartitionId) //note: 开始增加的第一个 partition var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) for (_ <- 0 until nPartitions) { //note: 对每个 partition 进行分配 if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) nextReplicaShift += 1 //note: 防止 partition 过大时,其中某些 partition 的分配(leader、follower)完全一样 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length //note: partition 的第一个 replica val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) for (j <- 0 until replicationFactor – 1) //note: 其他 replica 的分配 replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret } //note: 为 partition 设置完第一个 replica 后,其他 replica 分配的计算 private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers – 1)//note: 在 secondReplicaShift 的基础上增加一个 replicaIndex (firstReplicaIndex + shift) % nBrokers } |
对于上面代码,我们可以详细的描述一下,比如下面有5个节点,一个topic要求有10个partition,而且有三个副本
假设上面的startIndex和nextReplicaShift都是0
partition 为0时,那第一副本在 (0+0)%5=0,第二个副本在 (0+(1+(0+0)%4)))%5=1,第三副本在 (0+(1+(0+1)%4)))%5=2; partition 为2时,那第一副本在 (0+2)%5=2,第二个副本在 (2+(1+(0+0)%4)))%5=3,第三副本在 (2+(1+(0+1)%4)))%5=4; partition 为5时,那第一副本在 (0+5)%5=0,第二个副本在 (0+(1+(1+0)%4)))%5=2,第三副本在 (0+(1+(1+1)%4)))%5=3(partition 数是 Broker 数一倍时,nextReplicaShift 值会增加1); partition 为8时,那第一副本在 (0+8)%5=3,第二个副本在 (3+(1+(1+0)%4)))%5=0,第三副本在 (3+(1+(1+1)%4)))%5=1。 |
在下面我们将对应的分配后的partition和topic信息写入ZK上
分别将配置和分配策略
WriteTopicConfig 时候将对应的信息写入 /config/topic/TopicName
WriteTopicPartitionAssignment将对应数据写入/brokers/topic/TopicName
这样一个topic的创建完成就可以返回给用户
Kafka Controller会监听ZK,
然后进行Topic的实际创建,对应方法位于 Kafka Controller中
val newTopics = currentChildren — controllerContext.allTopics
首先获取到新增的topic
然后获取到新增Topic对应的分配策略
val addedPartitionReplicaAssignment =ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
之后进行判断是否有新增的Topic
if (newTopics.nonEmpty)//note: 处理新建的 topic
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)}
然后就可以处理新增的分区了
在KafkaController的onNewPartitionCreation() 方法中
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
info(“New partition creation callback for %s”.format(newPartitions.mkString(“,”))) partitionStateMachine.handleStateChanges(newPartitions, NewPartition) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica) partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica) } |
将这个newPartition进行了一个状态的改变
分别对应着四个状态
NonExistentPartition:这个 partition 不存在;
NewPartition:这个 partition 刚创建,有对应的 replicas,但还没有 leader 和 isr;
OnlinePartition:这个 partition 的 leader 已经选举出来了,处理正常的工作状态;
OfflinePartition:partition 的 leader 挂了。
上面的onNewPartitionCreation中,我们依次看下对应的函数
1.partitionStateMachine.handleStateChanges(newPartitions, NewPartition): 创建 Partition 对象,并将其状态置为 NewPartition 状态
2.replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica):创建 Replica 对象,并将其状态置为 NewReplica 状态;
3.partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector):将 partition 对象从 NewPartition 改为 OnlinePartition 状态;
4.ReplicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica):将Replica对象改为OnlineReplica状态
第一步创建一个Partition
case NewPartition =>
//note: 新建一个 partition assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) partitionState.put(topicAndPartition, NewPartition) //note: 缓存 partition 的状态 val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(“,”) stateChangeLogger.trace(“Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s” .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, assignedReplicas)) |
然后为每个Partition创建replica对象,设置为NewReplica
case NewReplica =>
assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) //note: 验证 // start replica as a follower to the current leader for its partition val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) leaderIsrAndControllerEpochOpt match { case Some(leaderIsrAndControllerEpoch) => if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)//note: 这个状态的 Replica 不能作为 leader throw new StateChangeFailedException(“Replica %d for partition %s cannot be moved to NewReplica” .format(replicaId, topicAndPartition) + “state as it is being requested to become leader”) //note: 向所有 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment) |
向所有的replicaId发送了请求
之后进行OnlinePartition
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) partitionState(topicAndPartition) match { case NewPartition => // initialize leader and isr path for new partition initializeLeaderAndIsrForPartition(topicAndPartition) //note: 为新建的 partition 初始化 leader 和 isr case OfflinePartition => electLeaderForPartition(topic, partition, leaderSelector) case OnlinePartition => // invoked when the leader needs to be re-elected electLeaderForPartition(topic, partition, leaderSelector) case _ => // should never come here since illegal previous states are checked above } |
初始化leader和isr,将leader和isr的信息发给zk
将replicas中第一个replica作为leader,所有replicajiaru isr
发送LeaderAndIsr请求给所有replica,发送UpdateMetadata给Broker
最后将Replica的状态从NewReplica更新OnlineReplica状态
这样topic创建完成