我们需要仔细的看下当下可用的Raft分布式系统是如何开发,如何去开发实现一个稳定运行的分布式系统?
我们需要首先明明确Raft所实现的两个目标,首先实现Raft的原理,然后设计出简单易用的API接口
接下来,我们从代码实现和接口使用的两个角度,来看当下流行的Raft实现,Hashicorp Raft
1.Hashicorp Raft 实现领导者选举
我们从程序的入口函数来看,入口函数一般为main()函数,那么在整体的选举过程中,入口函数是什么呢
Hashicorp Raft源码中,领导者选举的入口函数 run(),在raft.go中以一个单独的协程来运行,实现节点的状态迁移
func (r *Raft) run() {
for { select { // 关闭节点 case <-r.shutdownCh: r.setLeader(“”) return default: } switch r.getState() { // 跟随者 case Follower: r.runFollower() // 候选人 case Candidate: r.runCandidate() // 领导者 case Leader: r.runLeader() } } } |
上面的代码分为了 Follower 跟随者 Candidate 候选人 Leader 领导者
不同的节点状态对应的功能不同,抽出的函数自然不同,分别是runFollower(),runCandidate(),
runLeader()
接下来,我们分别看下这三种节点的状态
节点状态相关的数据结构和函数,是在state,go中实现的,跟随者,候选人,两到这三个状态,分别是32位的只读整数数值 (uint32)
type RaftState uint32
const (
// 跟随者
Follower RaftState = iota
// 候选人
Candidate
// 领导者
Leader
// 关闭状态
Shutdown
)
当需要字符串格式的节点状态,可以调用对应的RaftState.String()函数
然后是节点的自我信息了
比如,任期编号,
raftState属于结构体类型,包含了当前的任期编号,最新提交的日志项的索引值,存储中最新日志项的索引值和任期编号,当前的节点状态等
type raftState struct {
// 当前任期编号 currentTerm uint64 // 最大被提交的日志项的索引值 commitIndex uint64 // 最新被应用到状态机的日志项的索引值 lastApplied uint64 // 存储中最新的日志项的索引值和任期编号 lastLogIndex uint64 lastLogTerm uint64 // 当前节点的状态 state RaftState …… } |
看完了基本的数据结构,看下其中比较重要的RPC消息通讯的过程
Hashicorp Raft如何实现RPC消息的,
首先是RPC日志复制的如何请求的,对应的数据结构为AppendEntriesRequest,这是一个结构体,里面包含了Raft算法约定的字段,
Term:任期编号
PrevLogEntry:当前要复制的日志项,前面一条日志的索引值
PrevLogTerm:当前要复制的日志项,前一条日志项的任期编号
Entries:新日志项
具体的结构消息,如下
type AppendEntriesRequest struct {
// 当前的任期编号,和领导者信息(包括服务器ID和地址信息) Term uint64 Leader []byte // 当前要复制的日志项,前面一条日志项的索引值和任期编号 PrevLogEntry uint64 PrevLogTerm uint64 // 新日志项 Entries []*Log // 领导者节点上的已提交的日志项的最大索引值 LeaderCommitIndex uint64 } |
这就是一个简单的RPC消息的数据结构
然后我们看一下具体的代码执行流程
一开始,所有的节点都处于一个跟随者的状态,函数runFollower()运行,大致的执行流程如下
1.我们首先设置一个随机的心跳超时时间间隔,来配置心跳超时时长
2.进入到for循环中,然后利用select实现了多路IO复用,周期的获取消息进行处理
3.如果超时时间到达了,还没有收到消息,就执行步骤5
设置自己的状态为候选人,然后return退出函数
当节点设置自己为Candidate函数的时候,函数runCandidate()执行,大致执行步骤如下
首先是调用electSelf()发起投票,先给自己一张票,然后向其他的节点去请求投票RPC消息,请求其选举自己为领导者,然后设置一个随机的超时时间,来获取一个选举的超时时间
然后进入for循环中,通过select实现了io多路复用,周期的获取到了消息和处理,如果发生了选举超时,直接执行步骤3,放弃选举,不然就执行步骤4,退出了runCandidate()函数,然后再重新执行runCandidate()函数,发起新一轮的选举
得到了大多数节点的相应之后,就会被当选为领导者,调用setState(),将自己状态变更为领导者,退出runCandidate()函数
然后在在状态变更的时候,调用了runLeader()函数
在整个过程中,会进行
调用startStopReplication(),进行日志复制的功能
然后启动协程,调用replication()函数,执行日志复制的功能
然后在replicate()函数中,启动一个协程,执行心跳的发送功能
在heartbeat()函数中,周期的发送心跳请求,防止别人发起选举
如何发送数据呢?
Raft基于了强领导者模型和日志复制,实现的强一致性的,那么,那么如何学习日志复制的代码实现呢?
那么我们已经知道了日志复制是领导者发出的,跟随者来接收的,领导者的复制日志的接口和跟随者接收日志的接口,分别在runLeader()和runFollower()函数中
领导者复制日志的入口接口在startStopReplication(),在runLeader()中,以r.startStopReplication()形式被调用,作为一个单独的协程运行
跟随者接收日志的入口函数为processRPC(),在runFollower()中,以r.processRPC(rpc)的形式来调用,处理日志复制RPC消息
数据结构
一个日志项中包含了3种消息,分别是指令,索引值,任期编号,而在Hashicorp Raft实现中
日志对应的数据结构如下
type Log struct {
// 索引值 Index uint64 // 任期编号 Term uint64 // 日志项类别 Type LogType // 指令 Data []byte // 扩展信息 Extensions []byte } |
日志项对应的数据结构中,包含LogType和Extensions两个额外的字段,
LogType用于表示不同用途的日志项,比如LogCommand指令对应的日志项, LogConfiguration表示成员变更配置对应的日志项
Extensions可以再指定日志项中存储一些额外的信息,主要是用来调试来用
整体的复制流程如下
在runLeader()函数中,调用startStopReplication()函数,执行日志复制功能
启动一个新的协程,来调用replicate()函数,执行日志复制相关的功能
在replicate()函数中,调用replicateTo()函数,执行步骤4,发送DIFF差异复制,如果已经开启了流水线复制模式,那么就执行步骤5
在replicateTo()函数中,进行一致性检测,如果复制成功,也会开启流水线复制模式
这时候,调用pipelineReplicate()函数,开启流水线复制
跟随者如何去接收日志
领导者复制完成了,跟随者就会接收日志并开始处理,跟随者接收和处理日志,在runFollower()函数中执行的,主要有几个步骤
1.runFollower()函数中,调用processRPC()函数,处理接收到的RPC消息
2.在processRPC()函数中,调用appendEntries(),处理接收到的消息
3.appendEntries()函数中,跟随者处理日志的核心函数,3.1中进行比较日志的一致性,在步骤3.2中,将新日志项存放在本地,3.3中,计算了当前被应用的日志项,应用到本地状态机中
本章总结一下
我们从算法原理角度来理解了Hashicorp Raft实现
分别从
三种节点状态中对应的功能函数来入手,来查看当需要实现不同状态的功能的时候,将各自对应的函数作为入口函数,来进行代码的解读
raft.go是Hashicorp Raft的核心代码文件,大部分的核心功能都在这个文件中实现的
在Hashicorp Raft中,支持两种节点间的通讯机制,内存型和TCP协议型,其中,内存通讯机制,可以用于测试,两种通讯机制的代码实现,分别在文件inmem_transport.go和tcp_transport.go中
最后如何高效的阅读源码,就是不要全都要,从而陷入琐碎的细节中,具体了解入口类就可以了
那么如何实现Hashicorp Raft的网络通讯的呢?