本章我们希望通过讲解broker和client之间的通信,来串联Kafka不同功能的实现。

Kafka客户端和broker传输数据的时候,需要创建一个特定的Socket连接。然后分别按照规定好的请求和响应进行传输。而这样的一条Socket,会成为一条长连接,从而减少建立TCP的开销。

对于broker段,只需要维护一个Socket,进行数据的传输

对于client,则是创建多个Socket,进行网络传输,包括不同的broker,元数据获取,然后在之上利用了epoll的方式来进行轮询传输数据。

基本的请求和响应结构都是相似的,都包含了Size + Request/Response

Size是一个int32的正数,表示了这个消息的整体长度。

而Request中,基本分为了请求头和请求体,请求头结构基本固定,api_key 请求类型

Api_version 请求版本号

Correlation_id 响应的关联号,用于关联response和request,方便用户调试和排错。

Client_id 发出这个请求的client ID,区分不同集群client发出的请求。

响应中虽然也分为了头部和响应体,但是请求头中结构是非固定的,只有下面的字段

Correlation_id,用于管理request和response。

请求类型:

多达38个,不过我们会讲解一些比较重点的。

PRODUCE请求:

具有6个版本,最新版本格式为 事务Id + acks + timeout + topic[数据]

事务ID 为了支持事务而引入的字段

Acks 返回响应之前,需要保存好的broker个数。

Timeout 请求超时时间

Topic 数据,是一个数组,包含了topic + [partition + 消息集合]

Topic表示要发到哪些topic

Partition 表示发到topic的哪些分区

消息集合则是真正的消息。

PRODUCE的请求:

包含了[response+ throttle_time_ms]

throttle_time_ms 超过配额限制而延迟该请求处理的时间

response则是每个topic对应的返回数据,基本包含

partition 分区号

error_code 请求是否成功的code

base_offset 消息集合的起始位移。

Log_append_time broker端写入消息的时间

Log_start_offset response被创建时该分区日志的总起始位移。

其次是FETCH的请求和响应

存在于clients向着broker发送的FETCH请求,follower发给leader的FETCH请求。

FETCH具有7个版本,最新的版本中,请求格式为 replica_id + max_wait_time + min_bytes + max_bytes + isolation_level + [topics]

其中 replica_id  副本id,在follower向着leader发送消息的时候使用,如果是client发给broker的话,字段为-1.

max_wait_time 等待响应返回的最大时间

min_byte,max_bytes 分别代表这次FETCH携带的最小最大bytes数量

isolation_leve,隔离级别,用于事务

topic数组中包含了要请求的topic数据

其中一个元素包含 topic,分区信息,分区信息中又包含partition,fetch_offset,log_start_offset和max_bytes,其中fetch_offset是从哪个唯一开始读取消息,也是用于上面HW的同步的。

响应格式为 throttle_time_ms + [response]

其中重点是response,内部包含topic,以及消息集合。

METADATA请求:

基本格式为 [topics] + allow_auto_topic_creation

allow_auto_topic_creation 表示是否可以自动创建topic

topics则是需要获取元数据的topic数组

响应的格式为 throttle_time_ms + [brokers] + cluster_id + controller_id + [topic_metadata]

Brokers 包含了其中每个broker的详细信息,包括节点ID,主机名等信息

Cluster_id 集群ID

Controller_id 集群controller所在的broker ID

Topic_metadata,topic元数据组,包含了请求topic的所有元数据,比如

Topic_error_code topic错误码

Topic 名称

Partition_metadata topic下所有分区的元数据,每个分区的错误码,leader信息,副本信息,ISR信息。

整体的流程

对于clients段,基本流程如下

图片

大部分的请求是发给特定的broker的,比如PRODUCE和FETCH,部分例如METADATA可以发给任意一个broker,因为每个broker都保存了相同的元数据信息。

对于broker端,处理逻辑要简单的多。

图片

Broker会创建一个阻塞队列,专门接收clients发来的请求,然后由多个请求线程来处理这个阻塞队列中的请求。内部还引入了毒丸机制,即队列中获取到了一个特定的标记,就会终止自己的循环处理逻辑。

然后对于上面的版本,是需要考虑不同版本的兼容性

对于当前版本的broker所支持的版本,可以通过相关命令来查看的

Bin/kafka-broker-api-version.sh –bootstrap-server localhost:9092

发表评论

邮箱地址不会被公开。 必填项已用*标注