这次我们说下Server如何处理Fetch请求的,以及副本同步机制的处理
首先是一个Fetch请求的处理流程
整体流程如下
首先是根据APIS中的Fetch,判断处理了是一个Fetch请求,交给了handleFetchRequest()来处理请求
在handleFetchRequest中,首先判断了TopicPartition和topic相关的权限
其次是有一个内部函数,作为callback回调
def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
…. def fetchResponseCallback(delayTimeMs: Int) { trace(s”Sending fetch response to client $clientId of ” + s”${convertedPartitionData.map { case (_, v) => v.records.sizeInBytes }.sum} bytes”) val fetchResponse = if (delayTimeMs > 0) new FetchResponse(versionId, fetchedPartitionData, delayTimeMs) else response requestChannel.sendResponse(new RequestChannel.Response(request, fetchResponse)) } // When this callback is triggered, the remote API call has completed request.apiRemoteCompleteTimeMs = time.milliseconds //note: 配额情况的处理 if (fetchRequest.isFromFollower) { // We’ve already evaluated against the quota and are good to go. Just need to record it now. val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader) quotas.leader.record(responseSize) fetchResponseCallback(0) } else { quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, response.sizeOf, fetchResponseCallback) } } |
封装完成callback函数之后
实际的处理在下面的relicaManager中的fetchMessage
replicaManager.fetchMessages(
fetchRequest.maxWait.toLong, //note: 拉取请求最长的等待时间 fetchRequest.replicaId, //note: Replica 编号,Consumer 的为 -1 fetchRequest.minBytes, //note: 拉取请求设置的最小拉取字节 fetchRequest.maxBytes, //note: 拉取请求设置的最大拉取字节 versionId <= 2, authorizedRequestInfo, replicationQuota(fetchRequest), sendResponseCallback)} |
那么下一步就是进入了replicaManager中进行处理,这里是管理Replicas,会在其中有着唯一的Log文件
那么ReplicaManager如何处理Fetch请求的呢?
def fetchMessages(timeout: Long,
replicaId: Int, fetchMinBytes: Int, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota = UnboundedQuota, responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) { val isFromFollower = replicaId >= 0 //note: 判断请求是来自 consumer (这个值为 -1)还是副本同步 //note: 默认都是从 leader 拉取,推测这个值只是为了后续能从 follower 消费数据而设置的 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId //note: 如果拉取请求来自 consumer(true),只拉取 HW 以内的数据,如果是来自 Replica 同步,则没有该限制(false)。 val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) // read from local logs //note:获取本地日志 val logReadResults = readFromLocalLog( replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota) // if the fetch comes from the follower, // update its corresponding log end offset //note: 如果 fetch 来自 broker 的副本同步,那么就更新相关的 log end offset if(Request.isValidBrokerId(replicaId)) updateFollowerLogReadResults(replicaId, logReadResults) // check if this fetch request can be satisfied right away val logReadResultValues = logReadResults.map { case (_, v) => v } val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) => errorIncurred || (readResult.error != Errors.NONE)) // respond immediately if 1) fetch request does not want to wait // 2) fetch request does not require any data // 3) has enough data to respond // 4) some error happens while reading data //note: 如果满足以下条件的其中一个,将会立马返回结果: //note: 1. timeout 达到; 2. 拉取结果为空; 3. 拉取到足够的数据; 4. 拉取是遇到 error if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData(result.error, result.hw, result.info.records) } responseCallback(fetchPartitionData) } else { //note: 其他情况下,延迟发送结果 // construct the fetch results from the read results val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) => val fetchInfo = fetchInfos.collectFirst { case (tp, v) if tp == topicPartition => v }.getOrElse(sys.error(s”Partition $topicPartition not found in fetchInfos”)) (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo)) } val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) } // try to complete the request immediately, otherwise put it into the purgatory; // this is because while the delayed fetch operation is being created, new requests // may arrive and hence make this operation completable. delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } } |
首先从 本地的日志文件拉取相对应的数据
然后判断Fetch的来源,如果是副本的同步请求,就更新LEO(the end offset,关于这一点,我们会接下来将),并尝试更新isr列表
最后判断是否需要延迟返回
更加往下走,尝试看readFromLocalLog
其中会根据offset从tp中读取到对应的数据
/**
* Read from multiple topic partitions at the given offset up to maxSize bytes */ //note: 按 offset 从 tp 列表中读取相应的数据 def readFromLocalLog(replicaId: Int, fetchOnlyFromLeader: Boolean, readOnlyCommitted: Boolean, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = { def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { val offset = fetchInfo.offset val partitionFetchSize = fetchInfo.maxBytes BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark() try { trace(s”Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, ” + s”remaining response limit $limitBytes” + (if (minOneMessage) s”, ignoring response/partition size limits” else “”)) // decide whether to only fetch from leader //note: 根据决定 [是否只从 leader 读取数据] 来获取相应的副本 //note: 根据 tp 获取 Partition 对象, 在获取相应的 Replica 对象 val localReplica = if (fetchOnlyFromLeader) getLeaderReplicaIfLocal(tp) else getReplicaOrException(tp) // decide whether to only fetch committed data (i.e. messages below high watermark) //note: 获取 hw 位置,副本同步不设置这个值 val maxOffsetOpt = if (readOnlyCommitted) Some(localReplica.highWatermark.messageOffset) else None /* Read the LogOffsetMetadata prior to performing the read from the log. * We use the LogOffsetMetadata to determine if a particular replica is in-sync or not. * Using the log end offset after performing the read can lead to a race condition * where data gets appended to the log immediately after the replica has consumed from it * This can cause a replica to always be out of sync. */ val initialLogEndOffset = localReplica.logEndOffset.messageOffset //note: the end offset val initialHighWatermark = localReplica.highWatermark.messageOffset //note: hw val fetchTimeMs = time.milliseconds val logReadInfo = localReplica.log match { case Some(log) => val adjustedFetchSize = math.min(partitionFetchSize, limitBytes) // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition //note: 从指定的 offset 位置开始读取数据,副本同步不需要 maxOffsetOpt val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage) // If the partition is being throttled, simply return an empty set. if (shouldLeaderThrottle(quota, tp, replicaId)) //note: 如果被限速了,那么返回 空 集合 FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY) // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // progress in such cases and don’t need to report a `RecordTooLargeException` else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete) FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY) else fetch case None => error(s”Leader for partition $tp does not have a local log”) FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY) } //note: 返回最后的结果,返回的都是 LogReadResult 对象 LogReadResult(info = logReadInfo, hw = initialHighWatermark, leaderLogEndOffset = initialLogEndOffset, fetchTimeMs = fetchTimeMs, readSize = partitionFetchSize, exception = None) } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it // is supposed to indicate un-expected failure of a broker in handling a fetch request case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderForPartitionException | _: ReplicaNotAvailableException | _: OffsetOutOfRangeException) => LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), hw = -1L, leaderLogEndOffset = -1L, fetchTimeMs = -1L, readSize = partitionFetchSize, exception = Some(e)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() error(s”Error processing fetch operation on partition $tp, offset $offset”, e) LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), hw = -1L, leaderLogEndOffset = -1L, fetchTimeMs = -1L, readSize = partitionFetchSize, exception = Some(e)) } } var limitBytes = fetchMaxBytes val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)] var minOneMessage = !hardMaxBytesLimit readPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) //note: 读取该 tp 的数据 val messageSetSize = readResult.info.records.sizeInBytes // Once we read from a non-empty partition, we stop ignoring request and partition level size limits if (messageSetSize > 0) minOneMessage = false limitBytes = math.max(0, limitBytes – messageSetSize) result += (tp -> readResult) } result } |
首先获取到一个Partition对象,然后从其中获取到Replica
然后找到对应的Log,利用read()读取配置,如果是客户端的Fetch操作,还需要注意HW,这个后续也会讲
再往后就是具体的Log文件读取,这一步我们可以后续再讲
那么在说完Fetch的Request大致流程流程之后,我们需要往更上层来讲,因为副本同步机制是Kafka集群的实现基础,而且只要存在replica对象,那么broker就会启动一个副本同步,来从Leader中同步数据,那么我们就尝试思考,什么时候启动的副本同步流程,整体的处理逻辑
首先,是Kafka的副本同步,需要由ReplicaManager控制,毕竟Replica包含了Leader和Follow的关系,
故在其中有一个实例变量ReplicaFecherManager,负责管理副本同步流程
那么Replica Fecther如何启动的
这就决定于一个Replica在Partition中如何存在的,如果这个Replica是Follower的角色,那么就会尝试往Leader同步数据
那么从ReplicaManager上的Broker本地副本被选举为了follower,就会启动副本同步线程
首先是这个方法的入口,makeFollows
private def makeFollowers(controllerId: Int,
epoch: Int, partitionState: Map[Partition, PartitionState], correlationId: Int, responseMap: mutable.Map[TopicPartition, Short], metadataCache: MetadataCache) : Set[Partition] = { partitionState.keys.foreach { partition => stateChangeLogger.trace((“Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d ” + “starting the become-follower transition for partition %s”) .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } for (partition <- partitionState.keys) responseMap.put(partition.topicPartition, Errors.NONE.code) //note: 统计 follower 的集合 val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() try { // TODO: Delete leaders from LeaderAndIsrRequest partitionState.foreach{ case (partition, partitionStateInfo) => val newLeaderBrokerId = partitionStateInfo.leader //note: 明确leader 是可用的 metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { // Only change partition state when the leader is available case Some(_) => //note: partition 的本地副本设置为 follower if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) partitionsToMakeFollower += partition else //note: 这个 partition 的本地副本已经是 follower 了 stateChangeLogger.info((“Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from ” + “controller %d epoch %d for partition %s since the new leader %d is the same as the old leader”) .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, partition.topicPartition, newLeaderBrokerId)) case None =>//创建replica,同时记录leader不可用 // The leader broker should always be present in the metadata cache. // If not, we should record the error message and abort the transition process for this partition stateChangeLogger.error((“Broker %d received LeaderAndIsrRequest with correlation id %d from controller” + ” %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.”) .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, partition.topicPartition, newLeaderBrokerId)) // Create the local replica even if the leader is unavailable. This is required to ensure that we include // the partition’s high watermark in the checkpoint file (see KAFKA-1647) partition.getOrCreateReplica() } } //note: 删除对这些 partition 的副本同步线程 replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace((“Broker %d stopped fetchers as part of become-follower request from controller ” + “%d epoch %d with correlation id %d for partition %s”) .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) } //note: Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset logManager.truncateTo(partitionsToMakeFollower.map { partition => (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset) }.toMap) //note: 完成那些延迟请求的处理 partitionsToMakeFollower.foreach { partition => val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) tryCompleteDelayedProduce(topicPartitionOperationKey) tryCompleteDelayedFetch(topicPartitionOperationKey) } partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace((“Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of ” + “become-follower request with correlation id %d from controller %d epoch %d”).format(localBrokerId, partition.topicPartition, correlationId, controllerId, epoch)) } if (isShuttingDown.get()) { partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace((“Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from ” + “controller %d epoch %d for partition %s since it is shutting down”).format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } } else { // we do not need to check if the leader exists again since this has been done at the beginning of this process //note: 启动副本同步线程 val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => partition.topicPartition -> BrokerAndInitialOffset( metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), partition.getReplica().get.logEndOffset.messageOffset)).toMap //note: leader 信息+本地 replica 的 offset replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace((“Broker %d started fetcher to new leader as part of become-follower request from controller ” + “%d epoch %d with correlation id %d for partition %s”) .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) } } } catch { case e: Throwable => val errorMsg = (“Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d ” + “epoch %d”).format(localBrokerId, correlationId, controllerId, epoch) stateChangeLogger.error(errorMsg, e) // Re-throw the exception for it to be caught in KafkaApis throw e } partitionState.keys.foreach { partition => stateChangeLogger.trace((“Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d ” + “for the become-follower transition for partition %s”) .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } partitionsToMakeFollower } |
上面的代码中,首先设置副本为follower
然后从leader Partition集合中移除这些partition
其次是将这些partition标记为follower
清空对应的produce 和 fetch请求
如果当前broker没有掉线,就想这些partition的新leader启动副本同步线程
需要注意,并不一定会一个partition对应一个线程,而是根据一定的计算,分配给一个Fetch线程,而Fetch线程的数量是固定的
那么按照上面的流程,,在确定broker没有掉线之后,会启动副本同步献策和哪个
即调用函数addFetcherForPartitions
//note: 为一个 topic-partition 添加 replica-fetch 线程
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) { mapLock synchronized { //note: 为这些 topic-partition 分配相应的 fetch 线程 id val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) => BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))} for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { //note: 为 BrokerAndFetcherId 构造 fetcherThread 线程 var fetcherThread: AbstractFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { case Some(f) => fetcherThread = f case None => //note: 创建 fetcher 线程 fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) fetcherThread.start } //note: 添加 topic-partition 列表 fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) => tp -> brokerAndInitOffset.initOffset }) } } info(“Added fetcher for partitions %s”.format(partitionAndOffsets.map { case (topicPartition, brokerAndInitialOffset) => “[” + topicPartition + “, initOffset ” + brokerAndInitialOffset.initOffset + ” to broker ” + brokerAndInitialOffset.broker + “] “})) } |
上面代码中,首先计算了fetch id
然后判断当前是否存在对应id的线程,没有就创建一个新的fetch线程
然后将TopicPartition和fetch线程的关系映射进去
我们就继续走,看fetch线程如何处理的
ReplicaFetcherThread中的doWork是由run调用的
override def doWork() {
//note: 构造 fetch request val fetchRequest = inLock(partitionMapLock) { val fetchRequest = buildFetchRequest(partitionStates.partitionStates.asScala.map { state => state.topicPartition -> state.value }) if (fetchRequest.isEmpty) { //note: 如果没有活跃的 partition,在下次调用之前,sleep fetchBackOffMs 时间 trace(“There are no active partitions. Back off for %d ms before sending a fetch request”.format(fetchBackOffMs)) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } fetchRequest } if (!fetchRequest.isEmpty) processFetchRequest(fetchRequest) //note: 发送 fetch 请求,处理 fetch 的结果 } |
其中首先创建了一个fetchRequest,然后进行处理processFetchRequest
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData] partitionMap.foreach { case (topicPartition, partitionFetchState) => // We will not include a replica in the fetch request if it should be throttled. if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition)) requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)) } //note: 关键在于 setReplicaId 方法,设置了 replicaId, 对于 consumer, 该值为 CONSUMER_REPLICA_ID(-1) val requestBuilder = new JFetchRequest.Builder(maxWait, minBytes, requestMap). setReplicaId(replicaId).setMaxBytes(maxBytes) requestBuilder.setVersion(fetchRequestVersion) new FetchRequest(requestBuilder) |
在其中标记了replicaId,方便Leader区分是不是Follower来同步数据
其次是processFetchRequest
private def processFetchRequest(fetchRequest: REQ) {
val partitionsWithError = mutable.Set[TopicPartition]() def updatePartitionsWithError(partition: TopicPartition): Unit = { partitionsWithError += partition partitionStates.moveToEnd(partition) } var responseData: Seq[(TopicPartition, PD)] = Seq.empty try { trace(“Issuing to broker %d of fetch request %s”.format(sourceBroker.id, fetchRequest)) responseData = fetch(fetchRequest) //note: 发送 fetch 请求,获取 fetch 结果 } catch { case t: Throwable => if (isRunning.get) { warn(s”Error in fetch $fetchRequest”, t) inLock(partitionMapLock) { //note: fetch 时发生错误,sleep 一会 partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError) // there is an error occurred while fetching partitions, sleep a while // note that `ReplicaFetcherThread.handlePartitionsWithError` will also introduce the same delay for every // partition with error effectively doubling the delay. It would be good to improve this. partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } } } fetcherStats.requestRate.mark() if (responseData.nonEmpty) { //note: fetch 结果不为空 // process fetched data inLock(partitionMapLock) { responseData.foreach { case (topicPartition, partitionData) => val topic = topicPartition.topic val partitionId = topicPartition.partition Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState => // we append to the log if the current offset is defined and it is the same as the offset requested during fetch //note: 如果 fetch 的 offset 与返回结果的 offset 相同,并且返回没有异常,那么就将拉取的数据追加到对应的 partition 上 if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) { Errors.forCode(partitionData.errorCode) match { case Errors.NONE => try { val records = partitionData.toRecords val newOffset = records.shallowEntries.asScala.lastOption.map(_.nextOffset).getOrElse( currentPartitionFetchState.offset) fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark – newOffset) // Once we hand off the partition data to the subclass, we can’t mess with it any more in this thread //note: 将 fetch 的数据追加到日志文件中 processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData) val validBytes = records.validBytes if (validBytes > 0) { // Update partitionStates only if there is no exception during processPartitionData //note: 更新 fetch 的 offset 位置 partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset)) fetcherStats.byteRate.mark(validBytes) //note: 更新 metrics } } catch { case ime: CorruptRecordException => // we log the error and continue. This ensures two things // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // should get fixed in the subsequent fetches //note: CRC 验证失败时,打印日志,并继续进行(这个线程还会有其他的 tp 拉取,防止影响其他副本同步) logger.error(“Found invalid messages during fetch for partition [” + topic + “,” + partitionId + “] offset ” + currentPartitionFetchState.offset + ” error ” + ime.getMessage) updatePartitionsWithError(topicPartition); case e: Throwable => //note: 这里还会抛出异常,是 RUNTimeException throw new KafkaException(“error processing data for partition [%s,%d] offset %d” .format(topic, partitionId, currentPartitionFetchState.offset), e) } case Errors.OFFSET_OUT_OF_RANGE => //note: Out-of-range 的情况处理 try { val newOffset = handleOffsetOutOfRange(topicPartition) partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset)) error(“Current offset %d for partition [%s,%d] out of range; reset offset to %d” .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) } catch { //note: 处理 out-of-range 是抛出的异常 case e: Throwable => error(“Error getting offset for partition [%s,%d] to broker %d”.format(topic, partitionId, sourceBroker.id), e) updatePartitionsWithError(topicPartition) } case _ => //note: 其他的异常情况 if (isRunning.get) { error(“Error for partition [%s,%d] to broker %d:%s”.format(topic, partitionId, sourceBroker.id, partitionData.exception.get)) updatePartitionsWithError(topicPartition) } } }) } } } //note: 处理拉取遇到的错误读的 tp if (partitionsWithError.nonEmpty) { debug(“handling partitions with error for %s”.format(partitionsWithError)) handlePartitionsWithErrors(partitionsWithError) } |
其中fetch会发送fetch请求,拉取Leader数据
如果遇到了发送异常,就sleep一会
如果返回的数据不为空,那么就将拉取到的数据追加到本地副本的日志文件上,
对于上面流程的错误,根据不同的错误进行不同的处理,并且和下次的发送产生一个间隔,间隔为replica.fetch.backoff.ms设置时间
对于追加的操作,存在于processPartitionData函数
// process fetched data
//note: 处理 fetch 的数据,将 fetch 的数据追加的日志文件中 def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) { try { val replica = replicaMgr.getReplica(topicPartition).get val records = partitionData.toRecords //note: 检查 records maybeWarnIfOversizedRecords(records, topicPartition) if (fetchOffset != replica.logEndOffset.messageOffset) throw new RuntimeException(“Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.”.format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset)) if (logger.isTraceEnabled) trace(“Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d” .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) replica.log.get.append(records, assignOffsets = false) //note: 将 fetch 的数据追加到 log 中 if (logger.isTraceEnabled) trace(“Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s” .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition)) //note: 更新 replica 的 hw(logEndOffset 在追加数据后也会立马进行修改) val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) // for the follower replica, we do not need to keep // its segment base offset the physical position, // these values will be computed upon making the leader //note: 这个值主要是用在 leader replica 上的 replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) if (logger.isTraceEnabled) trace(s”Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark”) if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) } catch { case e: KafkaStorageException => fatal(s”Disk error while replicating data for $topicPartition”, e) Runtime.getRuntime.halt(1) } } |
其中的特殊操作是比较了本地LEO和远端返回的HW,从而设定本地的HW
那么上面还有一个根据异常进行处理的流程,主要处理的场景如下
1. 如果当前的Leader的LEO为1000,follower 的LEO为800,这时候Leader掉线,Follower上位了,那么LEO就是为800了,这时候的Leader上的LEO为1000,如果想要拉取,如何处理呢?
2. 假设一个replica的LEO为10,掉线了好几天,在连上发现LEO已经到[100,1000]了,如何拉取
对于上面,Kafka给出的解决如下
val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP,
brokerConfig.brokerId) |
首先获取到Leader的Offset
然后进行判断是否大于本地的LEO
if (leaderEndOffset < replica.logEndOffset.messageOffset) { //note: leaderEndOffset 小于 副本 LEO 的情况
// Prior to truncating the follower’s log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. //note: 这种情况只是发生在 unclear election 的情况下 if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) { //note: 不允许 unclear elect 时,直接退出进程 // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal(“Exiting because log truncation is not allowed for partition %s,”.format(topicPartition) + ” Current leader %d’s latest offset %d is less than replica %d’s latest offset %d” .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) System.exit(1) } |
如果真的大于本地的LEO,且配置不允许脏选举,那么就直接退出
不然就进行下述的处理
val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
brokerConfig.brokerId) warn(“Replica %d for partition %s reset its fetch offset from %d to current leader %d’s start offset %d” .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset) // Only truncate log when current leader’s log start offset is greater than follower’s log end offset. if (leaderStartOffset > replica.logEndOffset.messageOffset) //note: 如果 leader 的 startOffset 大于副本的最大 offset //note: 将这个 log 的数据全部清空,并且从 leaderStartOffset 开始拉取数据 replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset) offsetToFetch |
上面对于发生了OutOfRange的问题
如果follow下线了很久,就直接同步leaderStartOffset
如果触发了脏选举,现在的Follower的HW大于Leader的HW,就同步HW,但别忘了,存在数据不一致的问题
这样就是基本的Fetch流程讲解了