本次我们聚焦下Conusmer方向上,

在消费者如何进行消费,多个消费者消费多个Partition,如何消费,这样进行一个总结

简单来说,一般简单模型上,一个Topic对应一个Conusmer

图片

但是这样存在一个消费速率慢的问题,于是Server上将Partition进行了拆分

图片

这样就增加了Producer和Server的吞吐,进行了生产的负载均衡

但是需要考虑,Consumer可能并不单一的问题

图片

为了避免Consumer并不单一的问题,Kafka提供了消费者组的概念

Consumer Group,每一个Consumer在启动的时候都需要配置一个group id

图片

这样看起来很美好,但是需要考虑,如果消费者组多一个消费者怎么办

那样,就无法保证同一Partition下的消费到的数据是有序的

故需要规定,Kafka中一个Topic的分区数据只能被一个消费者组下的一个消费者消费

一个消费者组的消费者不应该大于可订阅主题的分区数量,会有某些Consumer无法消费

而且如果组内的消费者出现的了故障,会导致重新分配资源,这种操作称为重平衡,特别消耗性能

而一个Consumer组

图片

同时需要对应的管理者来管理消费者组,这个管理者称之为Coordinator,是存在于每一个Broker上的

如何确定Broker上的Coordinator和Consumer Group的关系呢?

是利用了Consumer的消费offset确定的 这个我们后续讲,大概的含义是指的c确定consumer group位移信息写入__consumers_offsets的哪个分区,就由这个分区的broker的coordinator作为consumer broker的coordinator

而Consumer Group中如何分配Topic和Partition呢,则是交给Consumer Group中的Leader决定的,这个Consumer Group Leader 是由第一个加入Group的Consumer担任的

交由Leader决定TP分配策略,进行Consumer Group级别的消费

而在Kafka中,提供了4个消费策略

分别是范围策略,轮询策略 粘性策略 合作策略

我们就说了这几种策略的逻辑概念

1. 首先是范围策略

其实逻辑很简单

for (String memberId : subscriptions.keySet())

assignment.put(memberId, new ArrayList<TopicPartition>());

for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {

String topic = topicEntry.getKey();

List<String> consumersForTopic = topicEntry.getValue();

Integer numPartitionsForTopic = partitionsPerTopic.get(topic);

if (numPartitionsForTopic == null)

continue;

Collections.sort(consumersForTopic);

int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();  //topic分区数除以消费者总数

int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();  //计算额外分区

List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);

for (int i = 0, n = consumersForTopic.size(); i < n; i++) {

int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);

int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);

assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));

}

}

}

上面代码中,首先是根据分区总数,计算出每一个consumer应有的基础数量

int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();  //topic分区数除以消费者总数

然后是计算分配完基础的分区后,还剩下的TP,即余数

int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();  //计算额外分区

那么就先分配基础的,再将余数依次遍历consumer,分配出去

假设我们有两个topic t1,t2分别有10个分区,最终的分配结果如下

C1-0:T1(0,1,2,3) T2(0,1,2,3)

C2-0:T1(4,5,6) T2(4,5,6)

C2-1:T1(7,8,9) T2(7,8,9)

这个分配策略有一个严重的弊端,那么就是如果有多个topic,每个topic含有的partition数量不同,那么topic越多,第一个Consumer上需要消费的分区就越多

第二个,则是RoundRobin

将消费组内所有的消费者,和所有的订阅Topic的partition进行排序,然后依次分配给每个消费者

这要求每个消费者订阅的主题相同

假设我们有两个topic,每个依次有4个partition

然后我们首先进行tp的hashcode排序,排序假设完成是 T2-0, T1-3, T1-0, T2-1, T1-2, T2-2, T1-1, T2-3

然后我们三个消费者线程排序为 C1 C2 C3

那么依次轮询分配完成如下

C1: T2-0,T2-1,T1-1

C2: T1-3,T1-2,T2-3

C3: T1-0,T2-2,

如果存在,一个消费组内的消费者所订阅的信息是不同的,那么就会导致分区分配的不均匀,以至于分配分区的时候,分配不到这个topic的任何分区

第三个就是粘性分配策略

就是要保证分配均匀的前提下,在进行重分配的时候,尽可能和上次分配保持相同

假设有四个topic, t0,t1,t2,t3,每个topic有两个分区,整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1一共八个分区

一共的分配结果如下

消费者C0:t0p0、t1p1、t3p0

消费者C1:t0p1、t2p0、t3p1

消费者C2:t1p0、t2p1

看起来是和RoundRobinAssingor分配的结果相同的,但是如果发生了重分配

在RoundRobinAssignor的策略下的话,此时的分配结果如下

消费者C0:t0p0、t1p0、t2p0、t3p0

消费者C2:t0p1、t1p1、t2p1、t3p1

会发现,发生了变动的tp很多

而如果利用粘性策略分配的话,那么分配结果基本如下

消费者C0:t0p0、t1p1、t3p0、t2p0

消费者C2:t1p0、t2p1、t0p1、t3p1

就是在保留了之前的分配结果之上,将原本的C1的TP进行了瓜分

最后是合作策略,实在粘性策略的基础上,增加了一些自定义的规则,针对重分配的时候如何分配游离的TP

触发重分配的时机则是\

新增或者删除消费者

消费者订阅的主题信息发生变化

主题的分区发生了变化

那么接下来我们看下Consumer如何加入group相关的源码

首先,上面我们说过,对于GroupCoordinator和Consumer Group的关系,利用了一个名为

__consumer_offsets的topic

上面其为Kafka内部使用的topic,专门用来存储group消费的情况,默认是拥有50个partition,每个partition3个replica

那么有了这个__consumer_offset topic之后,consumer group和Coordinator之间的关系的计算如下

通过 abs(GroupId.hashCode()) % NumPartitions来计算一个值,这个值一定会落在一个Partition上,然后利用这个Partiiton的leader所在的节点上的Coordinator作为所在节点

那么在启动一个Consumer并尝试订阅topic的时候,就会触发相对应的加入Consumer Group的操作

这里的相关代码存在于ConsumerCoordinator.poll()

// note: 它确保了这个 group 的 coordinator 是已知的,并且这个 consumer 是已经加入到了 group 中,也用于 offset 周期性的 commit

public void poll(long now) {

invokeCompletedOffsetCommitCallbacks();// note: 用于测试

// note: Step1 通过 subscribe() 方法订阅 topic,并且 coordinator 未知,初始化 Consumer Coordinator

if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {

// note: 获取 GroupCoordinator 地址,并且建立连接

ensureCoordinatorReady();

now = time.milliseconds();

}

// note: Step2 判断是否需要重新加入 group,如果订阅的 partition 变化或则分配的 partition 变化时,需要 rejoin

if (needRejoin()) {

// due to a race condition between the initial metadata fetch and the initial rebalance,

// we need to ensure that the metadata is fresh before joining initially. This ensures

// that we have matched the pattern against the cluster’s topics at least once before joining.

// note: rejoin group 之前先刷新一下 metadata(对于 AUTO_PATTERN 而言)

if (subscriptions.hasPatternSubscription())

client.ensureFreshMetadata();

// note: 确保 group 是 active; 加入 group; 分配订阅的 partition

ensureActiveGroup();

now = time.milliseconds();

}

// note: Step3 检查心跳线程运行是否正常,如果心跳线程失败,则抛出异常,反之更新 poll 调用的时间

pollHeartbeat(now);

// note: Step4 自动 commit 时,当定时达到时,进行自动 commit

maybeAutoCommitOffsetsAsync(now);

}

在Poll方法中,将其分为以下几步

首先初始化Consumer Coordinator,这里面需要获取到GroupCoordinator地址,并建立连接

然后判断是否需要重新加入group,如果订阅的partition变化,就需要rejoin

然后检查心跳线程状态

最后设置自动commit

其中的重点在于分别存在于

第一步的ensureCoordinatorReady和第二步的ensureActiveGroup操作

首先是ensureCoordinatorReady()函数

选择一个连接数最小的broker,尝试获取合理的GroupCoordinator

那么我们就先看对应的源码

protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {

long remainingMs = timeoutMs;

while (coordinatorUnknown()) {

// note:  获取 GroupCoordinator,并建立连接

RequestFuture<Void> future = lookupCoordinator();

client.poll(future, remainingMs);

if (future.failed()) {// note: 如果获取的过程中失败了

if (future.isRetriable()) {

remainingMs = timeoutMs – (time.milliseconds() – startTimeMs);

if (remainingMs <= 0)

break;

log.debug(“Coordinator discovery failed for group {}, refreshing metadata”, groupId);

client.awaitMetadataUpdate(remainingMs);

} else

throw future.exception();

} else if (coordinator != null && client.connectionFailed(coordinator)) {

// we found the coordinator, but the connection has failed, so mark

// it dead and backoff before retrying discovery

coordinatorDead();

time.sleep(retryBackoffMs);

}

remainingMs = timeoutMs – (time.milliseconds() – startTimeMs);

if (remainingMs <= 0)

break;

}

return !coordinatorUnknown();

}

这个方法中,就是尝试往一个连接数最小的Broker上发送获取GroupCoordinator的请求

然后就是如果失败了,那么就尝试等待一段时间后进行再次发送

// note: 选择一个连接最小的节点,发送 groupCoordinator 请求

protected synchronized RequestFuture<Void> lookupCoordinator() {

if (findCoordinatorFuture == null) {

// find a node to ask about the coordinator

Node node = this.client.leastLoadedNode();//NOTE: 找一个节点,发送 groupCoordinator 的请求

if (node == null) {

// TODO: If there are no brokers left, perhaps we should use the bootstrap set

// from configuration?

log.debug(“No broker available to send GroupCoordinator request for group {}”, groupId);

return RequestFuture.noBrokersAvailable();

} else

findCoordinatorFuture = sendGroupCoordinatorRequest(node);//NOTE: 发送请求,并对 response 进行处理

}

return findCoordinatorFuture;

}

具体的发送前置,获取到一个连接最小的节点

然后进行消息的发送

private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {

// initiate the group metadata request

log.debug(“Sending GroupCoordinator request for group {} to broker {}”, groupId, node);

GroupCoordinatorRequest.Builder requestBuilder =

new GroupCoordinatorRequest.Builder(this.groupId);

return client.send(node, requestBuilder)

.compose(new GroupCoordinatorResponseHandler());

//NOTE: compose 的作用是将 GroupCoordinatorResponseHandler 类转换为 RequestFuture

//NOTE: 实际上就是为返回的 Future 类重置 onSuccess() 和 onFailure() 方法

}

private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {

@Override

public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {

log.debug(“Received GroupCoordinator response {} for group {}”, resp, groupId);

GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody();

// use MAX_VALUE – node.id as the coordinator id to mimic separate connections

// for the coordinator in the underlying network client layer

// TODO: this needs to be better handled in KAFKA-1935

Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());

clearFindCoordinatorFuture();

if (error == Errors.NONE) {

// note: 如果正确获取 GroupCoordinator 时, 建立连接,并更新心跳时间

synchronized (AbstractCoordinator.this) {

AbstractCoordinator.this.coordinator = new Node(

Integer.MAX_VALUE – groupCoordinatorResponse.node().id(),

groupCoordinatorResponse.node().host(),

groupCoordinatorResponse.node().port());

log.info(“Discovered coordinator {} for group {}.”, coordinator, groupId);

client.tryConnect(coordinator);//note: 初始化 tcp 连接

heartbeat.resetTimeouts(time.milliseconds());//note: 更新心跳时间

}

future.complete(null);

} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {

future.raise(new GroupAuthorizationException(groupId));

} else {

log.debug(“Group coordinator lookup for group {} failed: {}”, groupId, error.message());

future.raise(error);

}

}

@Override

public void onFailure(RuntimeException e, RequestFuture<Void> future) {

clearFindCoordinatorFuture();

super.onFailure(e, future);

}

}

上述发送过程中注册了一个回调函数,进行处理,主要的逻辑也就存在于回调函数中

在上面的onSuccess函数中,如果没有异常正常获取到了GroupCoordinator,那么尝试和获取到的GroupCoordinator建立连接

AbstractCoordinator.this.coordinator = new Node(

Integer.MAX_VALUE – groupCoordinatorResponse.node().id(),

groupCoordinatorResponse.node().host(),

groupCoordinatorResponse.node().port());

然后就走一开始我们说的ensureAcitveGroup,在这一步正式的加入GroupCoordinator

整体的调用:

ensureActiveGroup -> ensureCoordinatorReady -> startHeartbeatThreadIfNeeded -> joinGroupIfNeeded

public void ensureActiveGroup() {

// always ensure that the coordinator is ready because we may have been disconnected

// when sending heartbeats and does not necessarily require us to rejoin the group.

ensureCoordinatorReady();//NOTE: 确保 GroupCoordinator 已经连接

startHeartbeatThreadIfNeeded();//NOTE: 启动心跳发送线程(并不一定发送心跳,满足条件后才会发送心跳)

joinGroupIfNeeded();//NOTE: 发送 JoinGroup 请求,并对返回的信息进行处理

}

而在join-group中,会进行初始化joingroup,发送join group request,以及

onJoinLeader/onJoinFollower,最后sendSyncGroupRequest后处理返回结果

首先是看joinGroupIfNeeded下的实现代码

// note: join group

public void joinGroupIfNeeded() {

while (needRejoin() || rejoinIncomplete()) {

//再次确保GroupCoordinator准备就绪

ensureCoordinatorReady();

// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second

// time if the client is woken up before a pending rebalance completes. This must be called

// on each iteration of the loop because an event requiring a rebalance (such as a metadata

// refresh which changes the matched subscription set) can occur while another rebalance is

// still in progress.

//note: 触发 onJoinPrepare, 包括 offset commit 和 rebalance listener

if (needsJoinPrepare) {

onJoinPrepare(generation.generationId, generation.memberId);

needsJoinPrepare = false;

}

// note: 初始化 JoinGroup 请求,并发送该请求

RequestFuture<ByteBuffer> future = initiateJoinGroup();

client.poll(future);

resetJoinGroupFuture();//NOTE: 重置 joinFuture 为空

if (future.succeeded()) {//note: join succeed,这一步时,时间上 sync-group 已经成功了

needsJoinPrepare = true;

onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());

} else {

RuntimeException exception = future.exception();

if (exception instanceof UnknownMemberIdException ||

exception instanceof RebalanceInProgressException ||

exception instanceof IllegalGenerationException)

continue;

else if (!future.isRetriable())

throw exception;

time.sleep(retryBackoffMs);

}

}

}

其中调用的重点在于initiateJoinGroup()函数

//NOTE: 发送 JoinGroup 的请求, 并添加 listener

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {

// we store the join future in case we are woken up by the user after beginning the

// rebalance in the call to poll below. This ensures that we do not mistakenly attempt

// to rejoin before the pending rebalance has completed.

if (joinFuture == null) {

// fence off the heartbeat thread explicitly so that it cannot interfere with the join group.

// Note that this must come after the call to onJoinPrepare since we must be able to continue

// sending heartbeats if that callback takes some time.

// note: rebalance 期间,心跳线程停止

disableHeartbeatThread();

state = MemberState.REBALANCING;//NOTE: 标记为 rebalance

joinFuture = sendJoinGroupRequest();//NOTE: 发送 JoinGroup 请求

joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {

@Override

public void onSuccess(ByteBuffer value) {

// handle join completion in the callback so that the callback will be invoked

// even if the consumer is woken up before finishing the rebalance

synchronized (AbstractCoordinator.this) {

log.info(“Successfully joined group {} with generation {}”, groupId, generation.generationId);

state = MemberState.STABLE;//NOTE: 标记 Consumer 为 stable

if (heartbeatThread != null)

heartbeatThread.enable();

}

}

@Override

public void onFailure(RuntimeException e) {

// we handle failures below after the request finishes. if the join completes

// after having been woken up, the exception is ignored and we will rejoin

synchronized (AbstractCoordinator.this) {

state = MemberState.UNJOINED;//NOTE: 标记 Consumer 为 Unjoined

}

}

});

}

return joinFuture;

}

上面代码很简单,就是确定发送一个加入GroupCoordinator请求的,然后在回调函数中标记了Consumer的状态,然后开启了心跳同步线程

sendJoinGroupListener的接口中,需要注意,其在compose中封装好了一个handler用于率先对reponse进行处理

private RequestFuture<ByteBuffer> sendJoinGroupRequest() {

if (coordinatorUnknown())

return RequestFuture.coordinatorNotAvailable();

// send a join group request to the coordinator

log.info(“(Re-)joining group {}”, groupId);

JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(

groupId,

this.sessionTimeoutMs,

this.generation.memberId,

protocolType(),

metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);

log.debug(“Sending JoinGroup ({}) to coordinator {}”, requestBuilder, this.coordinator);

return client.send(coordinator, requestBuilder)

.compose(new JoinGroupResponseHandler());

}

在handler中则是存在着onJoinLeader和onJoinFollower的流程

//NOTE: 处理 JoinGroup response 的 handler(同步 group 信息)

private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {

@Override

public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {

Errors error = Errors.forCode(joinResponse.errorCode());

if (error == Errors.NONE) {

log.debug(“Received successful JoinGroup response for group {}: {}”, groupId, joinResponse);

sensors.joinLatency.record(response.requestLatencyMs());

synchronized (AbstractCoordinator.this) {

if (state != MemberState.REBALANCING) {//NOTE: 如果此时 Consumer 的状态不是 rebalacing,就引起异常

// if the consumer was woken up before a rebalance completes, we may have already left

// the group. In this case, we do not want to continue with the sync group.

future.raise(new UnjoinedGroupException());

} else {

AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),

joinResponse.memberId(), joinResponse.groupProtocol());

AbstractCoordinator.this.rejoinNeeded = false;

//NOTE: join group 成功,下面需要进行 sync-group,获取分配的 tp 列表。

if (joinResponse.isLeader()) {

onJoinLeader(joinResponse).chain(future);

} else {

onJoinFollower().chain(future);

}

}

}

//下面为出现了问题后的流程,先抛开不看

} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {

log.debug(“Attempt to join group {} rejected since coordinator {} is loading the group.”, groupId,

coordinator());

// backoff and retry

future.raise(error);

} else if (error == Errors.UNKNOWN_MEMBER_ID) {

// reset the member id and retry immediately

resetGeneration();

log.debug(“Attempt to join group {} failed due to unknown member id.”, groupId);

future.raise(Errors.UNKNOWN_MEMBER_ID);

} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE

|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {

// re-discover the coordinator and retry with backoff

coordinatorDead();

log.debug(“Attempt to join group {} failed due to obsolete coordinator information: {}”, groupId, error.message());

future.raise(error);

} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL

|| error == Errors.INVALID_SESSION_TIMEOUT

|| error == Errors.INVALID_GROUP_ID) {

// log the error and re-throw the exception

log.error(“Attempt to join group {} failed due to fatal error: {}”, groupId, error.message());

future.raise(error);

} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {

future.raise(new GroupAuthorizationException(groupId));

} else {

// unexpected error, throw the exception

future.raise(new KafkaException(“Unexpected error in join group response: ” + error.message()));

}

}

}

上面可以看出,如果处于PerparingRebalance状态的Consumer

就会在下面进行按照Reponse是Leader还是follower来处理Reponse

当group是新的group id的时候,会设置当前的group状态为Empty

当GroupCoordinator接到consumer的join-consumer的请求,如果这时候group的member列表为空,那么就将第一个加入的member设置为leader

然后GroupCoordinator获取到leader发送join-group的请求,触发rebalance,group的状态为PreparingRebalance

然后GroupCoordinator等待一段时间,确定还有多少存活的consumer,然后group变为AwaitSync状态,返回response

Consumer收到了GroupCoordinaotr的response,如果是leader,进行分配tp

如果是follower,则发送一个空列表

GroupCoordinator收到leader的请求后,会将assign的结果返回给所有发送sync-group请求的consumer事例,将自身的状态设置为Stable,如果再收到sync-group的请求,由于状态已经是Stable了,就直接返回结果

那么onLeader和onFollower的实现如下

//note: 当 consumer 为 follower 时,从 GroupCoordinator 拉取分配结果

private RequestFuture<ByteBuffer> onJoinFollower() {

// send follower’s sync group with an empty assignment

SyncGroupRequest.Builder requestBuilder =

new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,

Collections.<String, ByteBuffer>emptyMap());

log.debug(“Sending follower SyncGroup for group {} to coordinator {}: {}”, groupId, this.coordinator,

requestBuilder);

return sendSyncGroupRequest(requestBuilder);

}

//note: 当 consumer 客户端为 leader 时,对 group 下的所有实例进行分配,将 assign 的结果发送到 GroupCoordinator

private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {

try {

// perform the leader synchronization and send back the assignment for the group

Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),

joinResponse.members());//NOTE: 进行 assign 操作

SyncGroupRequest.Builder requestBuilder =

new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);

log.debug(“Sending leader SyncGroup for group {} to coordinator {}: {}”,

groupId, this.coordinator, requestBuilder);

return sendSyncGroupRequest(requestBuilder);//NOTE: 发送 sync-group 请求

} catch (RuntimeException e) {

return RequestFuture.failure(e);

}

}

发出sendSyncGroupRequest之后,我们按照正常流程,会一直走到ConsumerCoordinator的onJoinComplete,就是更新metadata,包含其中订阅的tp列表

// note: 加入 group 成功

@Override

protected void onJoinComplete(int generation,

String memberId,

String assignmentStrategy,

ByteBuffer assignmentBuffer) {

// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)

if (!isLeader)

assignmentSnapshot = null;

PartitionAssignor assignor = lookupAssignor(assignmentStrategy);

if (assignor == null)

throw new IllegalStateException(“Coordinator selected invalid assignment protocol: ” + assignmentStrategy);

Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);

// set the flag to refresh last committed offsets

//note: 设置是否需要拉取 last committed offsets 为 true

subscriptions.needRefreshCommits();

// update partition assignment

//note: 更新订阅的 tp list

subscriptions.assignFromSubscribed(assignment.partitions());

// check if the assignment contains some topics that were not in the original

// subscription, if yes we will obey what leader has decided and add these topics

// into the subscriptions as long as they still match the subscribed pattern

//

// TODO this part of the logic should be removed once we allow regex on leader assign

Set<String> addedTopics = new HashSet<>();

for (TopicPartition tp : subscriptions.assignedPartitions()) {

if (!joinedSubscription.contains(tp.topic()))

addedTopics.add(tp.topic());

}

if (!addedTopics.isEmpty()) {

Set<String> newSubscription = new HashSet<>(subscriptions.subscription());

Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);

newSubscription.addAll(addedTopics);

newJoinedSubscription.addAll(addedTopics);

this.subscriptions.subscribeFromPattern(newSubscription);

this.joinedSubscription = newJoinedSubscription;

}

// update the metadata and enforce a refresh to make sure the fetcher can start

// fetching data in the next iteration

//note: 更新 metadata,确保在下一次循环中可以拉取

this.metadata.setTopics(subscriptions.groupSubscription());

client.ensureFreshMetadata();

// give the assignor a chance to update internal state based on the received assignment

assignor.onAssignment(assignment);

// reschedule the auto commit starting from now

this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;

// execute the user’s callback after rebalance

//note: 执行 listener

ConsumerRebalanceListener listener = subscriptions.listener();

log.info(“Setting newly assigned partitions {} for group {}”, subscriptions.assignedPartitions(), groupId);

try {

Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());

listener.onPartitionsAssigned(assigned);

} catch (WakeupException | InterruptException e) {

throw e;

} catch (Exception e) {

log.error(“User provided listener {} for group {} failed on partition assignment”,

listener.getClass().getName(), groupId, e);

}

}

至此,一个consumer就算加入成功

发表评论

邮箱地址不会被公开。 必填项已用*标注