本次我们聚焦下Conusmer方向上,
在消费者如何进行消费,多个消费者消费多个Partition,如何消费,这样进行一个总结
简单来说,一般简单模型上,一个Topic对应一个Conusmer
但是这样存在一个消费速率慢的问题,于是Server上将Partition进行了拆分
这样就增加了Producer和Server的吞吐,进行了生产的负载均衡
但是需要考虑,Consumer可能并不单一的问题
为了避免Consumer并不单一的问题,Kafka提供了消费者组的概念
Consumer Group,每一个Consumer在启动的时候都需要配置一个group id
这样看起来很美好,但是需要考虑,如果消费者组多一个消费者怎么办
那样,就无法保证同一Partition下的消费到的数据是有序的
故需要规定,Kafka中一个Topic的分区数据只能被一个消费者组下的一个消费者消费
一个消费者组的消费者不应该大于可订阅主题的分区数量,会有某些Consumer无法消费
而且如果组内的消费者出现的了故障,会导致重新分配资源,这种操作称为重平衡,特别消耗性能
而一个Consumer组
同时需要对应的管理者来管理消费者组,这个管理者称之为Coordinator,是存在于每一个Broker上的
如何确定Broker上的Coordinator和Consumer Group的关系呢?
是利用了Consumer的消费offset确定的 这个我们后续讲,大概的含义是指的c确定consumer group位移信息写入__consumers_offsets的哪个分区,就由这个分区的broker的coordinator作为consumer broker的coordinator
而Consumer Group中如何分配Topic和Partition呢,则是交给Consumer Group中的Leader决定的,这个Consumer Group Leader 是由第一个加入Group的Consumer担任的
交由Leader决定TP分配策略,进行Consumer Group级别的消费
而在Kafka中,提供了4个消费策略
分别是范围策略,轮询策略 粘性策略 合作策略
我们就说了这几种策略的逻辑概念
1. 首先是范围策略
其实逻辑很简单
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List<String> consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //topic分区数除以消费者总数 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //计算额外分区 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } } |
上面代码中,首先是根据分区总数,计算出每一个consumer应有的基础数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //topic分区数除以消费者总数
然后是计算分配完基础的分区后,还剩下的TP,即余数
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //计算额外分区
那么就先分配基础的,再将余数依次遍历consumer,分配出去
假设我们有两个topic t1,t2分别有10个分区,最终的分配结果如下
C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6) C2-1:T1(7,8,9) T2(7,8,9) |
这个分配策略有一个严重的弊端,那么就是如果有多个topic,每个topic含有的partition数量不同,那么topic越多,第一个Consumer上需要消费的分区就越多
第二个,则是RoundRobin
将消费组内所有的消费者,和所有的订阅Topic的partition进行排序,然后依次分配给每个消费者
这要求每个消费者订阅的主题相同
假设我们有两个topic,每个依次有4个partition
然后我们首先进行tp的hashcode排序,排序假设完成是 T2-0, T1-3, T1-0, T2-1, T1-2, T2-2, T1-1, T2-3
然后我们三个消费者线程排序为 C1 C2 C3
那么依次轮询分配完成如下
C1: T2-0,T2-1,T1-1
C2: T1-3,T1-2,T2-3
C3: T1-0,T2-2,
如果存在,一个消费组内的消费者所订阅的信息是不同的,那么就会导致分区分配的不均匀,以至于分配分区的时候,分配不到这个topic的任何分区
第三个就是粘性分配策略
就是要保证分配均匀的前提下,在进行重分配的时候,尽可能和上次分配保持相同
假设有四个topic, t0,t1,t2,t3,每个topic有两个分区,整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1一共八个分区
一共的分配结果如下
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1
看起来是和RoundRobinAssingor分配的结果相同的,但是如果发生了重分配
在RoundRobinAssignor的策略下的话,此时的分配结果如下
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1
会发现,发生了变动的tp很多
而如果利用粘性策略分配的话,那么分配结果基本如下
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1
就是在保留了之前的分配结果之上,将原本的C1的TP进行了瓜分
最后是合作策略,实在粘性策略的基础上,增加了一些自定义的规则,针对重分配的时候如何分配游离的TP
触发重分配的时机则是\
新增或者删除消费者
消费者订阅的主题信息发生变化
主题的分区发生了变化
那么接下来我们看下Consumer如何加入group相关的源码
首先,上面我们说过,对于GroupCoordinator和Consumer Group的关系,利用了一个名为
__consumer_offsets的topic
上面其为Kafka内部使用的topic,专门用来存储group消费的情况,默认是拥有50个partition,每个partition3个replica
那么有了这个__consumer_offset topic之后,consumer group和Coordinator之间的关系的计算如下
通过 abs(GroupId.hashCode()) % NumPartitions来计算一个值,这个值一定会落在一个Partition上,然后利用这个Partiiton的leader所在的节点上的Coordinator作为所在节点
那么在启动一个Consumer并尝试订阅topic的时候,就会触发相对应的加入Consumer Group的操作
这里的相关代码存在于ConsumerCoordinator.poll()
// note: 它确保了这个 group 的 coordinator 是已知的,并且这个 consumer 是已经加入到了 group 中,也用于 offset 周期性的 commit
public void poll(long now) { invokeCompletedOffsetCommitCallbacks();// note: 用于测试 // note: Step1 通过 subscribe() 方法订阅 topic,并且 coordinator 未知,初始化 Consumer Coordinator if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) { // note: 获取 GroupCoordinator 地址,并且建立连接 ensureCoordinatorReady(); now = time.milliseconds(); } // note: Step2 判断是否需要重新加入 group,如果订阅的 partition 变化或则分配的 partition 变化时,需要 rejoin if (needRejoin()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster’s topics at least once before joining. // note: rejoin group 之前先刷新一下 metadata(对于 AUTO_PATTERN 而言) if (subscriptions.hasPatternSubscription()) client.ensureFreshMetadata(); // note: 确保 group 是 active; 加入 group; 分配订阅的 partition ensureActiveGroup(); now = time.milliseconds(); } // note: Step3 检查心跳线程运行是否正常,如果心跳线程失败,则抛出异常,反之更新 poll 调用的时间 pollHeartbeat(now); // note: Step4 自动 commit 时,当定时达到时,进行自动 commit maybeAutoCommitOffsetsAsync(now); } |
在Poll方法中,将其分为以下几步
首先初始化Consumer Coordinator,这里面需要获取到GroupCoordinator地址,并建立连接
然后判断是否需要重新加入group,如果订阅的partition变化,就需要rejoin
然后检查心跳线程状态
最后设置自动commit
其中的重点在于分别存在于
第一步的ensureCoordinatorReady和第二步的ensureActiveGroup操作
首先是ensureCoordinatorReady()函数
选择一个连接数最小的broker,尝试获取合理的GroupCoordinator
那么我们就先看对应的源码
protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
long remainingMs = timeoutMs; while (coordinatorUnknown()) { // note: 获取 GroupCoordinator,并建立连接 RequestFuture<Void> future = lookupCoordinator(); client.poll(future, remainingMs); if (future.failed()) {// note: 如果获取的过程中失败了 if (future.isRetriable()) { remainingMs = timeoutMs – (time.milliseconds() – startTimeMs); if (remainingMs <= 0) break; log.debug(“Coordinator discovery failed for group {}, refreshing metadata”, groupId); client.awaitMetadataUpdate(remainingMs); } else throw future.exception(); } else if (coordinator != null && client.connectionFailed(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery coordinatorDead(); time.sleep(retryBackoffMs); } remainingMs = timeoutMs – (time.milliseconds() – startTimeMs); if (remainingMs <= 0) break; } return !coordinatorUnknown(); } |
这个方法中,就是尝试往一个连接数最小的Broker上发送获取GroupCoordinator的请求
然后就是如果失败了,那么就尝试等待一段时间后进行再次发送
// note: 选择一个连接最小的节点,发送 groupCoordinator 请求
protected synchronized RequestFuture<Void> lookupCoordinator() { if (findCoordinatorFuture == null) { // find a node to ask about the coordinator Node node = this.client.leastLoadedNode();//NOTE: 找一个节点,发送 groupCoordinator 的请求 if (node == null) { // TODO: If there are no brokers left, perhaps we should use the bootstrap set // from configuration? log.debug(“No broker available to send GroupCoordinator request for group {}”, groupId); return RequestFuture.noBrokersAvailable(); } else findCoordinatorFuture = sendGroupCoordinatorRequest(node);//NOTE: 发送请求,并对 response 进行处理 } return findCoordinatorFuture; } |
具体的发送前置,获取到一个连接最小的节点
然后进行消息的发送
private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
// initiate the group metadata request log.debug(“Sending GroupCoordinator request for group {} to broker {}”, groupId, node); GroupCoordinatorRequest.Builder requestBuilder = new GroupCoordinatorRequest.Builder(this.groupId); return client.send(node, requestBuilder) .compose(new GroupCoordinatorResponseHandler()); //NOTE: compose 的作用是将 GroupCoordinatorResponseHandler 类转换为 RequestFuture //NOTE: 实际上就是为返回的 Future 类重置 onSuccess() 和 onFailure() 方法 } private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> { @Override public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { log.debug(“Received GroupCoordinator response {} for group {}”, resp, groupId); GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody(); // use MAX_VALUE – node.id as the coordinator id to mimic separate connections // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 Errors error = Errors.forCode(groupCoordinatorResponse.errorCode()); clearFindCoordinatorFuture(); if (error == Errors.NONE) { // note: 如果正确获取 GroupCoordinator 时, 建立连接,并更新心跳时间 synchronized (AbstractCoordinator.this) { AbstractCoordinator.this.coordinator = new Node( Integer.MAX_VALUE – groupCoordinatorResponse.node().id(), groupCoordinatorResponse.node().host(), groupCoordinatorResponse.node().port()); log.info(“Discovered coordinator {} for group {}.”, coordinator, groupId); client.tryConnect(coordinator);//note: 初始化 tcp 连接 heartbeat.resetTimeouts(time.milliseconds());//note: 更新心跳时间 } future.complete(null); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { log.debug(“Group coordinator lookup for group {} failed: {}”, groupId, error.message()); future.raise(error); } } @Override public void onFailure(RuntimeException e, RequestFuture<Void> future) { clearFindCoordinatorFuture(); super.onFailure(e, future); } } |
上述发送过程中注册了一个回调函数,进行处理,主要的逻辑也就存在于回调函数中
在上面的onSuccess函数中,如果没有异常正常获取到了GroupCoordinator,那么尝试和获取到的GroupCoordinator建立连接
AbstractCoordinator.this.coordinator = new Node(
Integer.MAX_VALUE – groupCoordinatorResponse.node().id(), groupCoordinatorResponse.node().host(), groupCoordinatorResponse.node().port()); |
然后就走一开始我们说的ensureAcitveGroup,在这一步正式的加入GroupCoordinator
整体的调用:
ensureActiveGroup -> ensureCoordinatorReady -> startHeartbeatThreadIfNeeded -> joinGroupIfNeeded
public void ensureActiveGroup() {
// always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. ensureCoordinatorReady();//NOTE: 确保 GroupCoordinator 已经连接 startHeartbeatThreadIfNeeded();//NOTE: 启动心跳发送线程(并不一定发送心跳,满足条件后才会发送心跳) joinGroupIfNeeded();//NOTE: 发送 JoinGroup 请求,并对返回的信息进行处理 } |
而在join-group中,会进行初始化joingroup,发送join group request,以及
onJoinLeader/onJoinFollower,最后sendSyncGroupRequest后处理返回结果
首先是看joinGroupIfNeeded下的实现代码
// note: join group
public void joinGroupIfNeeded() { while (needRejoin() || rejoinIncomplete()) { //再次确保GroupCoordinator准备就绪 ensureCoordinatorReady(); // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second // time if the client is woken up before a pending rebalance completes. This must be called // on each iteration of the loop because an event requiring a rebalance (such as a metadata // refresh which changes the matched subscription set) can occur while another rebalance is // still in progress. //note: 触发 onJoinPrepare, 包括 offset commit 和 rebalance listener if (needsJoinPrepare) { onJoinPrepare(generation.generationId, generation.memberId); needsJoinPrepare = false; } // note: 初始化 JoinGroup 请求,并发送该请求 RequestFuture<ByteBuffer> future = initiateJoinGroup(); client.poll(future); resetJoinGroupFuture();//NOTE: 重置 joinFuture 为空 if (future.succeeded()) {//note: join succeed,这一步时,时间上 sync-group 已经成功了 needsJoinPrepare = true; onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value()); } else { RuntimeException exception = future.exception(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue; else if (!future.isRetriable()) throw exception; time.sleep(retryBackoffMs); } } } |
其中调用的重点在于initiateJoinGroup()函数
//NOTE: 发送 JoinGroup 的请求, 并添加 listener
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() { // we store the join future in case we are woken up by the user after beginning the // rebalance in the call to poll below. This ensures that we do not mistakenly attempt // to rejoin before the pending rebalance has completed. if (joinFuture == null) { // fence off the heartbeat thread explicitly so that it cannot interfere with the join group. // Note that this must come after the call to onJoinPrepare since we must be able to continue // sending heartbeats if that callback takes some time. // note: rebalance 期间,心跳线程停止 disableHeartbeatThread(); state = MemberState.REBALANCING;//NOTE: 标记为 rebalance joinFuture = sendJoinGroupRequest();//NOTE: 发送 JoinGroup 请求 joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance synchronized (AbstractCoordinator.this) { log.info(“Successfully joined group {} with generation {}”, groupId, generation.generationId); state = MemberState.STABLE;//NOTE: 标记 Consumer 为 stable if (heartbeatThread != null) heartbeatThread.enable(); } } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin synchronized (AbstractCoordinator.this) { state = MemberState.UNJOINED;//NOTE: 标记 Consumer 为 Unjoined } } }); } return joinFuture; } |
上面代码很简单,就是确定发送一个加入GroupCoordinator请求的,然后在回调函数中标记了Consumer的状态,然后开启了心跳同步线程
sendJoinGroupListener的接口中,需要注意,其在compose中封装好了一个handler用于率先对reponse进行处理
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); // send a join group request to the coordinator log.info(“(Re-)joining group {}”, groupId); JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder( groupId, this.sessionTimeoutMs, this.generation.memberId, protocolType(), metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs); log.debug(“Sending JoinGroup ({}) to coordinator {}”, requestBuilder, this.coordinator); return client.send(coordinator, requestBuilder) .compose(new JoinGroupResponseHandler()); } |
在handler中则是存在着onJoinLeader和onJoinFollower的流程
//NOTE: 处理 JoinGroup response 的 handler(同步 group 信息)
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { @Override public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { Errors error = Errors.forCode(joinResponse.errorCode()); if (error == Errors.NONE) { log.debug(“Received successful JoinGroup response for group {}: {}”, groupId, joinResponse); sensors.joinLatency.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this) { if (state != MemberState.REBALANCING) {//NOTE: 如果此时 Consumer 的状态不是 rebalacing,就引起异常 // if the consumer was woken up before a rebalance completes, we may have already left // the group. In this case, we do not want to continue with the sync group. future.raise(new UnjoinedGroupException()); } else { AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol()); AbstractCoordinator.this.rejoinNeeded = false; //NOTE: join group 成功,下面需要进行 sync-group,获取分配的 tp 列表。 if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); } } } //下面为出现了问题后的流程,先抛开不看 } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { log.debug(“Attempt to join group {} rejected since coordinator {} is loading the group.”, groupId, coordinator()); // backoff and retry future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID) { // reset the member id and retry immediately resetGeneration(); log.debug(“Attempt to join group {} failed due to unknown member id.”, groupId); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { // re-discover the coordinator and retry with backoff coordinatorDead(); log.debug(“Attempt to join group {} failed due to obsolete coordinator information: {}”, groupId, error.message()); future.raise(error); } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID) { // log the error and re-throw the exception log.error(“Attempt to join group {} failed due to fatal error: {}”, groupId, error.message()); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { // unexpected error, throw the exception future.raise(new KafkaException(“Unexpected error in join group response: ” + error.message())); } } } |
上面可以看出,如果处于PerparingRebalance状态的Consumer
就会在下面进行按照Reponse是Leader还是follower来处理Reponse
当group是新的group id的时候,会设置当前的group状态为Empty
当GroupCoordinator接到consumer的join-consumer的请求,如果这时候group的member列表为空,那么就将第一个加入的member设置为leader
然后GroupCoordinator获取到leader发送join-group的请求,触发rebalance,group的状态为PreparingRebalance
然后GroupCoordinator等待一段时间,确定还有多少存活的consumer,然后group变为AwaitSync状态,返回response
Consumer收到了GroupCoordinaotr的response,如果是leader,进行分配tp
如果是follower,则发送一个空列表
GroupCoordinator收到leader的请求后,会将assign的结果返回给所有发送sync-group请求的consumer事例,将自身的状态设置为Stable,如果再收到sync-group的请求,由于状态已经是Stable了,就直接返回结果
那么onLeader和onFollower的实现如下
//note: 当 consumer 为 follower 时,从 GroupCoordinator 拉取分配结果
private RequestFuture<ByteBuffer> onJoinFollower() { // send follower’s sync group with an empty assignment SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, Collections.<String, ByteBuffer>emptyMap()); log.debug(“Sending follower SyncGroup for group {} to coordinator {}: {}”, groupId, this.coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder); } //note: 当 consumer 客户端为 leader 时,对 group 下的所有实例进行分配,将 assign 的结果发送到 GroupCoordinator private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { try { // perform the leader synchronization and send back the assignment for the group Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());//NOTE: 进行 assign 操作 SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment); log.debug(“Sending leader SyncGroup for group {} to coordinator {}: {}”, groupId, this.coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder);//NOTE: 发送 sync-group 请求 } catch (RuntimeException e) { return RequestFuture.failure(e); } } |
发出sendSyncGroupRequest之后,我们按照正常流程,会一直走到ConsumerCoordinator的onJoinComplete,就是更新metadata,包含其中订阅的tp列表
// note: 加入 group 成功
@Override protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) { // only the leader is responsible for monitoring for metadata changes (i.e. partition changes) if (!isLeader) assignmentSnapshot = null; PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException(“Coordinator selected invalid assignment protocol: ” + assignmentStrategy); Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); // set the flag to refresh last committed offsets //note: 设置是否需要拉取 last committed offsets 为 true subscriptions.needRefreshCommits(); // update partition assignment //note: 更新订阅的 tp list subscriptions.assignFromSubscribed(assignment.partitions()); // check if the assignment contains some topics that were not in the original // subscription, if yes we will obey what leader has decided and add these topics // into the subscriptions as long as they still match the subscribed pattern // // TODO this part of the logic should be removed once we allow regex on leader assign Set<String> addedTopics = new HashSet<>(); for (TopicPartition tp : subscriptions.assignedPartitions()) { if (!joinedSubscription.contains(tp.topic())) addedTopics.add(tp.topic()); } if (!addedTopics.isEmpty()) { Set<String> newSubscription = new HashSet<>(subscriptions.subscription()); Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription); newSubscription.addAll(addedTopics); newJoinedSubscription.addAll(addedTopics); this.subscriptions.subscribeFromPattern(newSubscription); this.joinedSubscription = newJoinedSubscription; } // update the metadata and enforce a refresh to make sure the fetcher can start // fetching data in the next iteration //note: 更新 metadata,确保在下一次循环中可以拉取 this.metadata.setTopics(subscriptions.groupSubscription()); client.ensureFreshMetadata(); // give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment); // reschedule the auto commit starting from now this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs; // execute the user’s callback after rebalance //note: 执行 listener ConsumerRebalanceListener listener = subscriptions.listener(); log.info(“Setting newly assigned partitions {} for group {}”, subscriptions.assignedPartitions(), groupId); try { Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { log.error(“User provided listener {} for group {} failed on partition assignment”, listener.getClass().getName(), groupId, e); } } |
至此,一个consumer就算加入成功