这次我们说下Server如何处理Fetch请求的,以及副本同步机制的处理

首先是一个Fetch请求的处理流程

整体流程如下

首先是根据APIS中的Fetch,判断处理了是一个Fetch请求,交给了handleFetchRequest()来处理请求

在handleFetchRequest中,首先判断了TopicPartition和topic相关的权限

其次是有一个内部函数,作为callback回调

def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {

….

def fetchResponseCallback(delayTimeMs: Int) {

trace(s”Sending fetch response to client $clientId of ” +

s”${convertedPartitionData.map { case (_, v) => v.records.sizeInBytes }.sum} bytes”)

val fetchResponse = if (delayTimeMs > 0) new FetchResponse(versionId, fetchedPartitionData, delayTimeMs) else response

requestChannel.sendResponse(new RequestChannel.Response(request, fetchResponse))

}

// When this callback is triggered, the remote API call has completed

request.apiRemoteCompleteTimeMs = time.milliseconds

//note: 配额情况的处理

if (fetchRequest.isFromFollower) {

// We’ve already evaluated against the quota and are good to go. Just need to record it now.

val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)

quotas.leader.record(responseSize)

fetchResponseCallback(0)

} else {

quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, response.sizeOf, fetchResponseCallback)

}

}

封装完成callback函数之后

实际的处理在下面的relicaManager中的fetchMessage

replicaManager.fetchMessages(

fetchRequest.maxWait.toLong, //note: 拉取请求最长的等待时间

fetchRequest.replicaId, //note: Replica 编号,Consumer 的为 -1

fetchRequest.minBytes, //note: 拉取请求设置的最小拉取字节

fetchRequest.maxBytes, //note: 拉取请求设置的最大拉取字节

versionId <= 2,

authorizedRequestInfo,

replicationQuota(fetchRequest),

sendResponseCallback)}

那么下一步就是进入了replicaManager中进行处理,这里是管理Replicas,会在其中有着唯一的Log文件

那么ReplicaManager如何处理Fetch请求的呢?

def fetchMessages(timeout: Long,

replicaId: Int,

fetchMinBytes: Int,

fetchMaxBytes: Int,

hardMaxBytesLimit: Boolean,

fetchInfos: Seq[(TopicPartition, PartitionData)],

quota: ReplicaQuota = UnboundedQuota,

responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {

val isFromFollower = replicaId >= 0 //note: 判断请求是来自 consumer (这个值为 -1)还是副本同步

//note: 默认都是从 leader 拉取,推测这个值只是为了后续能从 follower 消费数据而设置的

val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId

//note: 如果拉取请求来自 consumer(true),只拉取 HW 以内的数据,如果是来自 Replica 同步,则没有该限制(false)。

val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)

// read from local logs

//note:获取本地日志

val logReadResults = readFromLocalLog(

replicaId = replicaId,

fetchOnlyFromLeader = fetchOnlyFromLeader,

readOnlyCommitted = fetchOnlyCommitted,

fetchMaxBytes = fetchMaxBytes,

hardMaxBytesLimit = hardMaxBytesLimit,

readPartitionInfo = fetchInfos,

quota = quota)

// if the fetch comes from the follower,

// update its corresponding log end offset

//note: 如果 fetch 来自 broker 的副本同步,那么就更新相关的 log end offset

if(Request.isValidBrokerId(replicaId))

updateFollowerLogReadResults(replicaId, logReadResults)

// check if this fetch request can be satisfied right away

val logReadResultValues = logReadResults.map { case (_, v) => v }

val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum

val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) =>

errorIncurred || (readResult.error != Errors.NONE))

// respond immediately if 1) fetch request does not want to wait

//                        2) fetch request does not require any data

//                        3) has enough data to respond

//                        4) some error happens while reading data

//note: 如果满足以下条件的其中一个,将会立马返回结果:

//note: 1. timeout 达到; 2. 拉取结果为空; 3. 拉取到足够的数据; 4. 拉取是遇到 error

if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {

val fetchPartitionData = logReadResults.map { case (tp, result) =>

tp -> FetchPartitionData(result.error, result.hw, result.info.records)

}

responseCallback(fetchPartitionData)

} else {

//note: 其他情况下,延迟发送结果

// construct the fetch results from the read results

val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>

val fetchInfo = fetchInfos.collectFirst {

case (tp, v) if tp == topicPartition => v

}.getOrElse(sys.error(s”Partition $topicPartition not found in fetchInfos”))

(topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))

}

val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,

fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)

val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)

// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation

val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }

// try to complete the request immediately, otherwise put it into the purgatory;

// this is because while the delayed fetch operation is being created, new requests

// may arrive and hence make this operation completable.

delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)

}

}

首先从 本地的日志文件拉取相对应的数据

然后判断Fetch的来源,如果是副本的同步请求,就更新LEO(the end offset,关于这一点,我们会接下来将),并尝试更新isr列表

最后判断是否需要延迟返回

更加往下走,尝试看readFromLocalLog

其中会根据offset从tp中读取到对应的数据

/**

* Read from multiple topic partitions at the given offset up to maxSize bytes

*/

//note: 按 offset 从 tp 列表中读取相应的数据

def readFromLocalLog(replicaId: Int,

fetchOnlyFromLeader: Boolean,

readOnlyCommitted: Boolean,

fetchMaxBytes: Int,

hardMaxBytesLimit: Boolean,

readPartitionInfo: Seq[(TopicPartition, PartitionData)],

quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {

def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {

val offset = fetchInfo.offset

val partitionFetchSize = fetchInfo.maxBytes

BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()

BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()

try {

trace(s”Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, ” +

s”remaining response limit $limitBytes” +

(if (minOneMessage) s”, ignoring response/partition size limits” else “”))

// decide whether to only fetch from leader

//note: 根据决定 [是否只从 leader 读取数据] 来获取相应的副本

//note: 根据 tp 获取 Partition 对象, 在获取相应的 Replica 对象

val localReplica = if (fetchOnlyFromLeader)

getLeaderReplicaIfLocal(tp)

else

getReplicaOrException(tp)

// decide whether to only fetch committed data (i.e. messages below high watermark)

//note: 获取 hw 位置,副本同步不设置这个值

val maxOffsetOpt = if (readOnlyCommitted)

Some(localReplica.highWatermark.messageOffset)

else

None

/* Read the LogOffsetMetadata prior to performing the read from the log.

* We use the LogOffsetMetadata to determine if a particular replica is in-sync or not.

* Using the log end offset after performing the read can lead to a race condition

* where data gets appended to the log immediately after the replica has consumed from it

* This can cause a replica to always be out of sync.

*/

val initialLogEndOffset = localReplica.logEndOffset.messageOffset //note: the end offset

val initialHighWatermark = localReplica.highWatermark.messageOffset //note: hw

val fetchTimeMs = time.milliseconds

val logReadInfo = localReplica.log match {

case Some(log) =>

val adjustedFetchSize = math.min(partitionFetchSize, limitBytes)

// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition

//note: 从指定的 offset 位置开始读取数据,副本同步不需要 maxOffsetOpt

val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)

// If the partition is being throttled, simply return an empty set.

if (shouldLeaderThrottle(quota, tp, replicaId)) //note: 如果被限速了,那么返回 空 集合

FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)

// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make

// progress in such cases and don’t need to report a `RecordTooLargeException`

else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete)

FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)

else fetch

case None =>

error(s”Leader for partition $tp does not have a local log”)

FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)

}

//note: 返回最后的结果,返回的都是 LogReadResult 对象

LogReadResult(info = logReadInfo,

hw = initialHighWatermark,

leaderLogEndOffset = initialLogEndOffset,

fetchTimeMs = fetchTimeMs,

readSize = partitionFetchSize,

exception = None)

} catch {

// NOTE: Failed fetch requests metric is not incremented for known exceptions since it

// is supposed to indicate un-expected failure of a broker in handling a fetch request

case e@ (_: UnknownTopicOrPartitionException |

_: NotLeaderForPartitionException |

_: ReplicaNotAvailableException |

_: OffsetOutOfRangeException) =>

LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),

hw = -1L,

leaderLogEndOffset = -1L,

fetchTimeMs = -1L,

readSize = partitionFetchSize,

exception = Some(e))

case e: Throwable =>

BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark()

BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()

error(s”Error processing fetch operation on partition $tp, offset $offset”, e)

LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),

hw = -1L,

leaderLogEndOffset = -1L,

fetchTimeMs = -1L,

readSize = partitionFetchSize,

exception = Some(e))

}

}

var limitBytes = fetchMaxBytes

val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]

var minOneMessage = !hardMaxBytesLimit

readPartitionInfo.foreach { case (tp, fetchInfo) =>

val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) //note: 读取该 tp 的数据

val messageSetSize = readResult.info.records.sizeInBytes

// Once we read from a non-empty partition, we stop ignoring request and partition level size limits

if (messageSetSize > 0)

minOneMessage = false

limitBytes = math.max(0, limitBytes – messageSetSize)

result += (tp -> readResult)

}

result

}

首先获取到一个Partition对象,然后从其中获取到Replica

然后找到对应的Log,利用read()读取配置,如果是客户端的Fetch操作,还需要注意HW,这个后续也会讲

再往后就是具体的Log文件读取,这一步我们可以后续再讲

那么在说完Fetch的Request大致流程流程之后,我们需要往更上层来讲,因为副本同步机制是Kafka集群的实现基础,而且只要存在replica对象,那么broker就会启动一个副本同步,来从Leader中同步数据,那么我们就尝试思考,什么时候启动的副本同步流程,整体的处理逻辑

首先,是Kafka的副本同步,需要由ReplicaManager控制,毕竟Replica包含了Leader和Follow的关系,

故在其中有一个实例变量ReplicaFecherManager,负责管理副本同步流程

那么Replica Fecther如何启动的

这就决定于一个Replica在Partition中如何存在的,如果这个Replica是Follower的角色,那么就会尝试往Leader同步数据

那么从ReplicaManager上的Broker本地副本被选举为了follower,就会启动副本同步线程

首先是这个方法的入口,makeFollows

private def makeFollowers(controllerId: Int,

epoch: Int,

partitionState: Map[Partition, PartitionState],

correlationId: Int,

responseMap: mutable.Map[TopicPartition, Short],

metadataCache: MetadataCache) : Set[Partition] = {

partitionState.keys.foreach { partition =>

stateChangeLogger.trace((“Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d ” +

“starting the become-follower transition for partition %s”)

.format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))

}

for (partition <- partitionState.keys)

responseMap.put(partition.topicPartition, Errors.NONE.code)

//note: 统计 follower 的集合

val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()

try {

// TODO: Delete leaders from LeaderAndIsrRequest

partitionState.foreach{ case (partition, partitionStateInfo) =>

val newLeaderBrokerId = partitionStateInfo.leader

//note: 明确leader 是可用的

metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {

// Only change partition state when the leader is available

case Some(_) => //note: partition 的本地副本设置为 follower

if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))

partitionsToMakeFollower += partition

else //note: 这个 partition 的本地副本已经是 follower 了

stateChangeLogger.info((“Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from ” +

“controller %d epoch %d for partition %s since the new leader %d is the same as the old leader”)

.format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,

partition.topicPartition, newLeaderBrokerId))

case None =>//创建replica,同时记录leader不可用

// The leader broker should always be present in the metadata cache.

// If not, we should record the error message and abort the transition process for this partition

stateChangeLogger.error((“Broker %d received LeaderAndIsrRequest with correlation id %d from controller” +

” %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.”)

.format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,

partition.topicPartition, newLeaderBrokerId))

// Create the local replica even if the leader is unavailable. This is required to ensure that we include

// the partition’s high watermark in the checkpoint file (see KAFKA-1647)

partition.getOrCreateReplica()

}

}

//note: 删除对这些 partition 的副本同步线程

replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))

partitionsToMakeFollower.foreach { partition =>

stateChangeLogger.trace((“Broker %d stopped fetchers as part of become-follower request from controller ” +

“%d epoch %d with correlation id %d for partition %s”)

.format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))

}

//note: Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset

logManager.truncateTo(partitionsToMakeFollower.map { partition =>

(partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)

}.toMap)

//note: 完成那些延迟请求的处理

partitionsToMakeFollower.foreach { partition =>

val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)

tryCompleteDelayedProduce(topicPartitionOperationKey)

tryCompleteDelayedFetch(topicPartitionOperationKey)

}

partitionsToMakeFollower.foreach { partition =>

stateChangeLogger.trace((“Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of ” +

“become-follower request with correlation id %d from controller %d epoch %d”).format(localBrokerId,

partition.topicPartition, correlationId, controllerId, epoch))

}

if (isShuttingDown.get()) {

partitionsToMakeFollower.foreach { partition =>

stateChangeLogger.trace((“Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from ” +

“controller %d epoch %d for partition %s since it is shutting down”).format(localBrokerId, correlationId,

controllerId, epoch, partition.topicPartition))

}

}

else {

// we do not need to check if the leader exists again since this has been done at the beginning of this process

//note: 启动副本同步线程

val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>

partition.topicPartition -> BrokerAndInitialOffset(

metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),

partition.getReplica().get.logEndOffset.messageOffset)).toMap //note: leader 信息+本地 replica 的 offset

replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

partitionsToMakeFollower.foreach { partition =>

stateChangeLogger.trace((“Broker %d started fetcher to new leader as part of become-follower request from controller ” +

“%d epoch %d with correlation id %d for partition %s”)

.format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))

}

}

} catch {

case e: Throwable =>

val errorMsg = (“Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d ” +

“epoch %d”).format(localBrokerId, correlationId, controllerId, epoch)

stateChangeLogger.error(errorMsg, e)

// Re-throw the exception for it to be caught in KafkaApis

throw e

}

partitionState.keys.foreach { partition =>

stateChangeLogger.trace((“Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d ” +

“for the become-follower transition for partition %s”)

.format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))

}

partitionsToMakeFollower

}

上面的代码中,首先设置副本为follower

然后从leader Partition集合中移除这些partition

其次是将这些partition标记为follower

清空对应的produce 和 fetch请求

如果当前broker没有掉线,就想这些partition的新leader启动副本同步线程

需要注意,并不一定会一个partition对应一个线程,而是根据一定的计算,分配给一个Fetch线程,而Fetch线程的数量是固定的

那么按照上面的流程,,在确定broker没有掉线之后,会启动副本同步献策和哪个

即调用函数addFetcherForPartitions

//note: 为一个 topic-partition 添加 replica-fetch 线程

def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {

mapLock synchronized {

//note: 为这些 topic-partition 分配相应的 fetch 线程 id

val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) =>

BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}

for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {

//note: 为 BrokerAndFetcherId 构造 fetcherThread 线程

var fetcherThread: AbstractFetcherThread = null

fetcherThreadMap.get(brokerAndFetcherId) match {

case Some(f) => fetcherThread = f

case None =>

//note: 创建 fetcher 线程

fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)

fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)

fetcherThread.start

}

//note: 添加 topic-partition 列表

fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>

tp -> brokerAndInitOffset.initOffset

})

}

}

info(“Added fetcher for partitions %s”.format(partitionAndOffsets.map { case (topicPartition, brokerAndInitialOffset) =>

“[” + topicPartition + “, initOffset ” + brokerAndInitialOffset.initOffset + ” to broker ” + brokerAndInitialOffset.broker + “] “}))

}

上面代码中,首先计算了fetch id

然后判断当前是否存在对应id的线程,没有就创建一个新的fetch线程

然后将TopicPartition和fetch线程的关系映射进去

我们就继续走,看fetch线程如何处理的

ReplicaFetcherThread中的doWork是由run调用的

override def doWork() {

//note: 构造 fetch request

val fetchRequest = inLock(partitionMapLock) {

val fetchRequest = buildFetchRequest(partitionStates.partitionStates.asScala.map { state =>

state.topicPartition -> state.value

})

if (fetchRequest.isEmpty) { //note: 如果没有活跃的 partition,在下次调用之前,sleep fetchBackOffMs 时间

trace(“There are no active partitions. Back off for %d ms before sending a fetch request”.format(fetchBackOffMs))

partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)

}

fetchRequest

}

if (!fetchRequest.isEmpty)

processFetchRequest(fetchRequest) //note: 发送 fetch 请求,处理 fetch 的结果

}

其中首先创建了一个fetchRequest,然后进行处理processFetchRequest

protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {

val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]

partitionMap.foreach { case (topicPartition, partitionFetchState) =>

// We will not include a replica in the fetch request if it should be throttled.

if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))

requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))

}

//note: 关键在于 setReplicaId 方法,设置了 replicaId, 对于 consumer, 该值为 CONSUMER_REPLICA_ID(-1)

val requestBuilder = new JFetchRequest.Builder(maxWait, minBytes, requestMap).

setReplicaId(replicaId).setMaxBytes(maxBytes)

requestBuilder.setVersion(fetchRequestVersion)

new FetchRequest(requestBuilder)

在其中标记了replicaId,方便Leader区分是不是Follower来同步数据

其次是processFetchRequest

private def processFetchRequest(fetchRequest: REQ) {

val partitionsWithError = mutable.Set[TopicPartition]()

def updatePartitionsWithError(partition: TopicPartition): Unit = {

partitionsWithError += partition

partitionStates.moveToEnd(partition)

}

var responseData: Seq[(TopicPartition, PD)] = Seq.empty

try {

trace(“Issuing to broker %d of fetch request %s”.format(sourceBroker.id, fetchRequest))

responseData = fetch(fetchRequest) //note: 发送 fetch 请求,获取 fetch 结果

} catch {

case t: Throwable =>

if (isRunning.get) {

warn(s”Error in fetch $fetchRequest”, t)

inLock(partitionMapLock) { //note: fetch 时发生错误,sleep 一会

partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError)

// there is an error occurred while fetching partitions, sleep a while

// note that `ReplicaFetcherThread.handlePartitionsWithError` will also introduce the same delay for every

// partition with error effectively doubling the delay. It would be good to improve this.

partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)

}

}

}

fetcherStats.requestRate.mark()

if (responseData.nonEmpty) { //note: fetch 结果不为空

// process fetched data

inLock(partitionMapLock) {

responseData.foreach { case (topicPartition, partitionData) =>

val topic = topicPartition.topic

val partitionId = topicPartition.partition

Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>

// we append to the log if the current offset is defined and it is the same as the offset requested during fetch

//note: 如果 fetch 的 offset 与返回结果的 offset 相同,并且返回没有异常,那么就将拉取的数据追加到对应的 partition 上

if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) {

Errors.forCode(partitionData.errorCode) match {

case Errors.NONE =>

try {

val records = partitionData.toRecords

val newOffset = records.shallowEntries.asScala.lastOption.map(_.nextOffset).getOrElse(

currentPartitionFetchState.offset)

fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark – newOffset)

// Once we hand off the partition data to the subclass, we can’t mess with it any more in this thread

//note: 将 fetch 的数据追加到日志文件中

processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)

val validBytes = records.validBytes

if (validBytes > 0) {

// Update partitionStates only if there is no exception during processPartitionData

//note: 更新 fetch 的 offset 位置

partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))

fetcherStats.byteRate.mark(validBytes) //note: 更新 metrics

}

} catch {

case ime: CorruptRecordException =>

// we log the error and continue. This ensures two things

// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag

// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and

// should get fixed in the subsequent fetches

//note: CRC 验证失败时,打印日志,并继续进行(这个线程还会有其他的 tp 拉取,防止影响其他副本同步)

logger.error(“Found invalid messages during fetch for partition [” + topic + “,” + partitionId + “] offset ” + currentPartitionFetchState.offset  + ” error ” + ime.getMessage)

updatePartitionsWithError(topicPartition);

case e: Throwable =>

//note: 这里还会抛出异常,是 RUNTimeException

throw new KafkaException(“error processing data for partition [%s,%d] offset %d”

.format(topic, partitionId, currentPartitionFetchState.offset), e)

}

case Errors.OFFSET_OUT_OF_RANGE => //note: Out-of-range 的情况处理

try {

val newOffset = handleOffsetOutOfRange(topicPartition)

partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))

error(“Current offset %d for partition [%s,%d] out of range; reset offset to %d”

.format(currentPartitionFetchState.offset, topic, partitionId, newOffset))

} catch { //note: 处理 out-of-range 是抛出的异常

case e: Throwable =>

error(“Error getting offset for partition [%s,%d] to broker %d”.format(topic, partitionId, sourceBroker.id), e)

updatePartitionsWithError(topicPartition)

}

case _ => //note: 其他的异常情况

if (isRunning.get) {

error(“Error for partition [%s,%d] to broker %d:%s”.format(topic, partitionId, sourceBroker.id,

partitionData.exception.get))

updatePartitionsWithError(topicPartition)

}

}

})

}

}

}

//note: 处理拉取遇到的错误读的 tp

if (partitionsWithError.nonEmpty) {

debug(“handling partitions with error for %s”.format(partitionsWithError))

handlePartitionsWithErrors(partitionsWithError)

}

其中fetch会发送fetch请求,拉取Leader数据

如果遇到了发送异常,就sleep一会

如果返回的数据不为空,那么就将拉取到的数据追加到本地副本的日志文件上,

对于上面流程的错误,根据不同的错误进行不同的处理,并且和下次的发送产生一个间隔,间隔为replica.fetch.backoff.ms设置时间

对于追加的操作,存在于processPartitionData函数

// process fetched data

//note: 处理 fetch 的数据,将 fetch 的数据追加的日志文件中

def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {

try {

val replica = replicaMgr.getReplica(topicPartition).get

val records = partitionData.toRecords

//note: 检查 records

maybeWarnIfOversizedRecords(records, topicPartition)

if (fetchOffset != replica.logEndOffset.messageOffset)

throw new RuntimeException(“Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.”.format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))

if (logger.isTraceEnabled)

trace(“Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d”

.format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))

replica.log.get.append(records, assignOffsets = false) //note: 将 fetch 的数据追加到 log 中

if (logger.isTraceEnabled)

trace(“Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s”

.format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))

//note: 更新 replica 的 hw(logEndOffset 在追加数据后也会立马进行修改)

val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)

// for the follower replica, we do not need to keep

// its segment base offset the physical position,

// these values will be computed upon making the leader

//note: 这个值主要是用在 leader replica 上的

replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)

if (logger.isTraceEnabled)

trace(s”Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark”)

if (quota.isThrottled(topicPartition))

quota.record(records.sizeInBytes)

} catch {

case e: KafkaStorageException =>

fatal(s”Disk error while replicating data for $topicPartition”, e)

Runtime.getRuntime.halt(1)

}

}

其中的特殊操作是比较了本地LEO和远端返回的HW,从而设定本地的HW

那么上面还有一个根据异常进行处理的流程,主要处理的场景如下

1. 如果当前的Leader的LEO为1000,follower 的LEO为800,这时候Leader掉线,Follower上位了,那么LEO就是为800了,这时候的Leader上的LEO为1000,如果想要拉取,如何处理呢?

2. 假设一个replica的LEO为10,掉线了好几天,在连上发现LEO已经到[100,1000]了,如何拉取

对于上面,Kafka给出的解决如下

  val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP,

brokerConfig.brokerId)

首先获取到Leader的Offset

然后进行判断是否大于本地的LEO

if (leaderEndOffset < replica.logEndOffset.messageOffset) { //note: leaderEndOffset 小于 副本 LEO 的情况

// Prior to truncating the follower’s log, ensure that doing so is not disallowed by the configuration for unclean leader election.

// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,

// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.

//note: 这种情况只是发生在 unclear election 的情况下

if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,

ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) { //note: 不允许 unclear elect 时,直接退出进程

// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.

fatal(“Exiting because log truncation is not allowed for partition %s,”.format(topicPartition) +

” Current leader %d’s latest offset %d is less than replica %d’s latest offset %d”

.format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset))

System.exit(1)

}

如果真的大于本地的LEO,且配置不允许脏选举,那么就直接退出

不然就进行下述的处理

val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,

brokerConfig.brokerId)

warn(“Replica %d for partition %s reset its fetch offset from %d to current leader %d’s start offset %d”

.format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))

val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)

// Only truncate log when current leader’s log start offset is greater than follower’s log end offset.

if (leaderStartOffset > replica.logEndOffset.messageOffset) //note: 如果 leader 的 startOffset 大于副本的最大 offset

//note: 将这个 log 的数据全部清空,并且从 leaderStartOffset 开始拉取数据

replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset)

offsetToFetch

上面对于发生了OutOfRange的问题

如果follow下线了很久,就直接同步leaderStartOffset

如果触发了脏选举,现在的Follower的HW大于Leader的HW,就同步HW,但别忘了,存在数据不一致的问题

这样就是基本的Fetch流程讲解了

发表评论

邮箱地址不会被公开。 必填项已用*标注