我们说一下消息队列的生产和消费两个核心的流程,在其中大部分的消息队列中,流程是一致的,通过这次学习,我们可以掌握并理解消息队列收发消息的问题

本次是Kafka消费者的源代码,理解kafka的消费实现过程

Kafka的消费模型中的几个特点

Kafka的每个消费者Consumer属于一个消费组ConsumerGroup

消费的时候,CosumerGroup的每个Consumer独占一个或者多个Partition分区

对于每个ConsumerGroup,任意时刻,每个Partition至多有一个Consumer在消费

每个ConsumerGruop都有一个Coordinator协调者负责分配Consumer和Partition的对应关系

每当Partition或者Consumer发生改变的时候,其会重新分配

Consumer利用心跳请求维持的Coordinator的关系,方便出现故障及时触发rebalance

那么我们看一下Kafka相关的文档

https://kafka.apache.org/10/javadoc/?org/apache/kafka/clients/consumer/KafkaConsumer.html

这里给出了Kafka的Consumer最简化消费代码

Properties props = new Properties();

props.put(“bootstrap.servers”, “localhost:9092”);

props.put(“group.id”, “test”);

props.put(“enable.auto.commit”, “true”);

props.put(“auto.commit.interval.ms”, “1000”);

props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList(“foo”, “bar”));

while (true) {

ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records)

System.out.printf(“offset = %d, key = %s, value = %s%n”, record.offset(), record.key(), record.value());

}

上述代码的流程基本为

1.设置必备的配置信息,配置了Broker,Consumer Group的ID,自动提交消费位置,序列化配置

2.创建了一个Consumer的实例

3.设置了两个Topic foo bar

4.循环拉取消息

在上述的整体流程中,主要涉及的流程分为了 订阅 and 拉取消息

那么我们首先看一下订阅的流程如何实现

1.订阅的流程

我们先看下subscribe这个函数

//代码2:

@Override

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {

acquireAndEnsureOpen();

try {

maybeThrowInvalidGroupIdException();

if (topics == null)

throw new IllegalArgumentException(“Topic collection to subscribe to cannot be null”);

if (topics.isEmpty()) {

// treat subscribing to empty topic list as the same as unsubscribing

this.unsubscribe();

} else {

for (String topic : topics) {

if (topic == null || topic.trim().isEmpty())

throw new IllegalArgumentException(“Topic collection to subscribe to cannot contain null or empty topic”);

}

throwIfNoAssignorsConfigured();

fetcher.clearBufferedDataForUnassignedTopics(topics);

log.info(“Subscribed to topic(s): {}”, Utils.join(topics, “, “));

if (this.subscriptions.subscribe(new HashSet<>(topics), listener))

//元数据更新

metadata.requestUpdateForNewTopics();

}

} finally {

release();

}

}

 

上述代码,简化为

1,调用了保证线程安全的方法

2.进行参数校验.去掉空格等操作

3.重新进行订阅了

4.更新了元数据,包括集群的Broker节点等信息

主要是重置了订阅状态,然后更新了topic信息,订阅状态subscriptions维护了订阅的topic和patition的消费信息,

然后metadata中维护了Kafka的一个子集,包含了集群的Broker节点,Topic,Partition信息

上面中,有acquireAndEnsureOpen 和 try-finally release()

保证单线程运行

这也是为Consumer不是线程安全的,避免出现问题

我们增加健壮性的方式

我们看下acquireAndEnsureOpen 这个函数的实现

本质上是调用了acquire函数,而acquire函数代码如下

private void acquire() {

long threadId = Thread.currentThread().getId();

if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))

throw new ConcurrentModificationException(“KafkaConsumer is not safe for multi-threaded access”);

refcount.incrementAndGet();

}

我们尝试的获取线程,并比较,不相等的时候并尝试将并发线程设置为当前线程id,然后将引用数加一,失败则quick false

然后在最后进行release,释放线程

private void release() {

if (refcount.decrementAndGet() == 0)

currentThread.set(NO_CURRENT_THREAD);

}

release中则是将引用数减一,如果为0了,则设置为可并发模式

然后我们更新元数据中的metadata.setTopic()函数,我们更新了Metadata的Topic之外,还调用metadata.requestUpdate()方法请求更新元数据

在requestUpdate函数中,我们发现,其实没有真正的更新元数据的请求,只是将needUpdate这个标志位设置为了true就结束了,Kafka必须要在第一次拉取消息之前更新一次元数据

public synchronized int requestUpdateForNewTopics() {

// Override the timestamp of last refresh to let immediate update.

this.lastRefreshMs = 0;

this.needPartialUpdate = true;

this.requestVersion++;

return this.updateVersion;

}

 

上述订阅的流程中,我们没有和网络有任何的交互

更新了订阅状态subscriptions和元数据metadata的相关topic的一些属性,元数据设置为了需要立刻更新,没有真正的发送更新请求

真正的更新则是在poll流程中

图片

我们看一下源码流程

/**

* @throws KafkaException if the rebalance callback throws exception

*/

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {

 

acquireAndEnsureOpen();

try {this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {

 

throw new IllegalStateException(“Consumer is not subscribed to any topics or assigned any partitions”);

}

do {

client.maybeTriggerWakeup();

if (includeMetadataInTimeout) {

// try to update assignment metadata BUT do not need to block on the timer for join group

updateAssignmentMetadataIfNeeded(timer, false);

} else {

/*

updateAssignmentMetadataIfNeeded方法有3个作用:

– discovery coordinator if necessary

– join group if necessary

– refresh metadata and fetch position if necessary

*/

while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {

log.warn(“Still waiting for metadata”);

}

}

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

if (!records.isEmpty()) {

// before returning the fetched records, we can send off the next round of fetches

// and avoid block waiting for their responses to enable pipelining while the user

// is handling the fetched records.

//

// NOTE: since the consumed position has already been updated, we must not allow

// wakeups or any other errors to be triggered prior to returning the fetched records.

if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {

client.transmitSends();

}

return this.interceptors.onConsume(new ConsumerRecords<>(records));

}

} while (timer.notExpired());

return ConsumerRecords.empty();

} finally {

release();

this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());

}

}

poll中我们分别调用了

我们指定的事件内不断的进行工作,上来进行元数据的同步,这一步利用了updateAssignmentMetadataIfNeeded() 更新元数据

boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {

if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {

return false;

}

/*

Set the fetch position to the committed position (if there is one) or reset it using the offset reset policy the user has configured.

*/

return updateFetchPositions(timer);

}

这里的操作,主要是去发现coordinator

加入一个组

刷新metadata

在其中,我们调用了coordinator.poll(),内部代码如下

public boolean poll(Timer timer, boolean waitForJoinGroup) {

maybeUpdateSubscriptionMetadata();

invokeCompletedOffsetCommitCallbacks();

if (subscriptions.hasAutoAssignedPartitions()) {

if (protocol == null) {

throw new IllegalStateException(“User configured ” + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +

” to empty while trying to subscribe for group protocol to auto assign partitions”);

}

// Always update the heartbeat last poll time so that the heartbeat thread does not leave the

// group proactively due to application inactivity even if (say) the coordinator cannot be found.

pollHeartbeat(timer.currentTimeMs());

if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {

return false;

}

if (rejoinNeededOrPending()) {

// due to a race condition between the initial metadata fetch and the initial rebalance,

// we need to ensure that the metadata is fresh before joining initially. This ensures

// that we have matched the pattern against the cluster’s topics at least once before joining.

if (subscriptions.hasPatternSubscription()) {

// For consumer group that uses pattern-based subscription, after a topic is created,

// any consumer that discovers the topic after metadata refresh can trigger rebalance

// across the entire consumer group. Multiple rebalances can be triggered after one topic

// creation if consumers refresh metadata at vastly different times. We can significantly

// reduce the number of rebalances caused by single topic creation by asking consumer to

// refresh metadata before re-joining the group as long as the refresh backoff time has

// passed.

if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {

this.metadata.requestUpdate();

}

if (!client.ensureFreshMetadata(timer)) {

return false;

}

maybeUpdateSubscriptionMetadata();

}

// if not wait for join group, we would just use a timer of 0

if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {

// since we may use a different timer in the callee, we’d still need

// to update the original timer’s current time after the call

timer.update(time.milliseconds());

return false;

}

}

} else {

// For manually assigned partitions, if there are no ready nodes, await metadata.

// If connections to all nodes fail, wakeups triggered while attempting to send fetch

// requests result in polls returning immediately, causing a tight loop of polls. Without

// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.

// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.

// When group management is used, metadata wait is already performed for this scenario as

// coordinator is unknown, hence this check is not required.

if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {

client.awaitMetadataUpdate(timer);

}

}

maybeAutoCommitOffsetsAsync(timer.currentTimeMs());

return true;

}

然后在updateAssignmentMetadataIfNeeded()函数中,调用了coordinator.poll()方法

poll()方法中调用了client的ensureFreshMetadata()方法,其中还调用了client.poll()方法,实现和Cluster通信,Coordinatro上注册Consumer并拉取和更新元数据

client.poll代码如下

public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) {

// there may be handlers which need to be invoked if we woke up the previous call to poll

firePendingCompletedRequests();

lock.lock();

try {

// Handle async disconnects prior to attempting any sends

handlePendingDisconnects();

// send all the requests we can send now

long pollDelayMs = trySend(timer.currentTimeMs());

// check whether the poll is still needed by the caller. Note that if the expected completion

// condition becomes satisfied after the call to shouldBlock() (because of a fired completion

// handler), the client will be woken up.

if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {

// if there are no requests in flight, do not block longer than the retry backoff

long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);

if (client.inFlightRequestCount() == 0)

pollTimeout = Math.min(pollTimeout, retryBackoffMs);

client.poll(pollTimeout, timer.currentTimeMs());

} else {

client.poll(0, timer.currentTimeMs());

}

timer.update();

// handle any disconnects by failing the active requests. note that disconnects must

// be checked immediately following poll since any subsequent call to client.ready()

// will reset the disconnect status

checkDisconnects(timer.currentTimeMs());

if (!disableWakeup) {

// trigger wakeups after checking for disconnects so that the callbacks will be ready

// to be fired on the next call to poll()

maybeTriggerWakeup();

}

// throw InterruptException if this thread is interrupted

maybeThrowInterruptException();

// try again to send requests since buffer space may have been

// cleared or a connect finished in the poll

trySend(timer.currentTimeMs());

// fail requests that couldn’t be sent if they have expired

failExpiredRequests(timer.currentTimeMs());

// clean unsent requests collection to keep the map from growing indefinitely

unsent.clean();

} finally {

lock.unlock();

}

// called without the lock to avoid deadlock potential if handlers need to acquire locks

firePendingCompletedRequests();

metadata.maybeThrowAnyException();

}

然后ConsumerNetworkClient封装了Consumer和Cluster之间的网络通信

一个彻底的一步实现,所有待发送的Request放在属性unsent,返回的Response放在属性pendingCompletion中,每次调用poll,发送所有的Request,处理所有的Response

这样,我们利用异步设计的优势来很少的线程实现高吞吐量,但劣势也很明显,增加了代码的复杂度,

其实,更新消费位置的方法,也是在updateAssignmentMetadataIfNeeded()函数中,

其中调用了KafkaConsumer.updateFetchPositions(timer)

然后又会调用coordinator.refreshCommittedOffsetsIfNeeded()

然后是拉取消息的pollForFetches()的实现

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {

        // 省略部分代码

        // 如果缓存里面有未读取的消息,直接返回这些消息

        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();

        if (!records.isEmpty()) {

            return records;

        }

        // 构造拉取消息请求,并发送

        fetcher.sendFetches();

        // 省略部分代码

        // 发送网络请求拉取消息,等待直到有消息返回或者超时

        client.poll(pollTimer, () -> {

            return !fetcher.hasCompletedFetches();

        });

        // 省略部分代码

        // 返回拉到的消息

        return fetcher.fetchedRecords();

    }

在这串代码中

我们首先看缓存中有未读取的消息,直接返回

构造拉取消息请求,发送

发送网络消息等响应

返回拉取到的信息

实际逻辑在sendFetchs实现类中,根据元数据的信息,构造所有需要的broker的拉消息的Request

public synchronized int sendFetches() {

// Update metrics in case there was an assignment change

sensors.maybeUpdateAssignment(subscriptions);

Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();

for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {

final Node fetchTarget = entry.getKey();

final FetchSessionHandler.FetchRequestData data = entry.getValue();

final FetchRequest.Builder request = FetchRequest.Builder

.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())

.isolationLevel(isolationLevel)

.setMaxBytes(this.maxBytes)

.metadata(data.metadata())

.toForget(data.toForget())

.rackId(clientRackId);

log.debug(“Sending {} {} to broker {}”, isolationLevel, data, fetchTarget);

RequestFuture<ClientResponse> future = client.send(fetchTarget, request);

// We add the node to the set of nodes with pending fetch requests before adding the

// listener because the future may have been fulfilled on another thread (e.g. during a

// disconnection being handled by the heartbeat thread) which will mean the listener

// will be invoked synchronously.

this.nodesWithPendingFetchRequests.add(entry.getKey().id());

future.addListener(new RequestFutureListener<ClientResponse>() {

@Override

public void onSuccess(ClientResponse resp) {

synchronized (Fetcher.this) {

try {

@SuppressWarnings(“unchecked”)

FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();

FetchSessionHandler handler = sessionHandler(fetchTarget.id());

if (handler == null) {

log.error(“Unable to find FetchSessionHandler for node {}. Ignoring fetch response.”,

fetchTarget.id());

return;

}

if (!handler.handleResponse(response)) {

return;

}

Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());

FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {

TopicPartition partition = entry.getKey();

FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);

if (requestData == null) {

String message;

if (data.metadata().isFull()) {

message = MessageFormatter.arrayFormat(

“Response for missing full request partition: partition={}; metadata={}”,

new Object[]{partition, data.metadata()}).getMessage();

} else {

message = MessageFormatter.arrayFormat(

“Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}”,

new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();

}

// Received fetch response for missing session partition

throw new IllegalStateException(message);

} else {

long fetchOffset = requestData.fetchOffset;

FetchResponse.PartitionData<Records> partitionData = entry.getValue();

log.debug(“Fetch {} at offset {} for partition {} returned fetch data {}”,

isolationLevel, fetchOffset, partition, partitionData);

Iterator<? extends RecordBatch> batches = partitionData.records().batches().iterator();

short responseVersion = resp.requestHeader().apiVersion();

completedFetches.add(new CompletedFetch(partition, partitionData,

metricAggregator, batches, fetchOffset, responseVersion, resp.receivedTimeMs()));

}

}

sensors.fetchLatency.record(resp.requestLatencyMs());

} finally {

nodesWithPendingFetchRequests.remove(fetchTarget.id());

}

}

}

@Override

public void onFailure(RuntimeException e) {

synchronized (Fetcher.this) {

try {

FetchSessionHandler handler = sessionHandler(fetchTarget.id());

if (handler != null) {

handler.handleError(e);

}

} finally {

nodesWithPendingFetchRequests.remove(fetchTarget.id());

}

}

}

});

}

return fetchRequestMap.size();

}

最后,fetcher.fetchedRecords()方法,将返回的Response反序列化转换给消息列表,给调用者

图片

上面流程整体为

构建Request对象,存入发送队列,不会立刻发送,而是等待时机批量发送,然后用回调或者RequestFuture的方法,预先定义好处理响应的逻辑

收到Broker的相应,也是暂时存在队列中,择机处理,这个择机处理的逻辑很复杂,可能是读取相应,或者缓冲区慢了,或者时机到了

发表评论

邮箱地址不会被公开。 必填项已用*标注