我们要具体的讲解分布式KV系统的核心功能点的实现细节,如何实现读操作对应的3种一致性模型等
我们之前将整个系统划分为了三个功能块,接入协议,KV操作,分布式集群,我们将按照顺序具体的说一下每块功能的实现
1.我们先说下如何实现这个接入的协议,我们选择了HTTP作为通讯协议,并设计了”/key” “/join”2个HTTP Restful API,并分别支持KV操作和增加节点的操作,它是如何实现的呢?
在ServerHTTP()中,根据URL路径设置的相关的路由信息,比如,在handlerKeyResquest()中处理URL路径为”/key”的请求,在handleJoin()中处理URL路径为”/join”请求
在handlerKeyRequest()中,处理客户端的KV请求操作,也就是基于HTTP POST请求的赋值操作,基于HTTP GET的请求的查询操作,HTTP DELETE的删除操作
在handleJoin()中,处理增加节点的请求,调用raft.AddVoter()函数,来加入集群
首先别忘了,在URL设置相关的路由信息,需要考虑的是路径前缀匹配,比如strings,HasPrefix(r.URL.Paht)这样可以匹配,而r.URL.Paht == “/key’会因为路径匹配出错,无法找路由信息,执行失败,而join的话,因为都是Post请求,路径比如是全等于 join,所以不必要担心,执行失败
curl -XGET raft-cluster-host01:8091/ley/foo
这是key请求的常见方式
2.其次,就是如何实现KV操作
查询,赋值,删除三种操作,如何实现的呢?
赋值是基于HTTP POST请求来实现的,就好比下面的样子
curl -XPOST htt://raft-cluster-host01:8091/key -d ‘{“foo”:”bar”}’
在程序的服务端,我们接收到这个POST请求后,如何实现赋值操作呢?
我们假设请求的直接是领导者,那么我们走一下这个流程
当接收到KV操作的请求的时候,系统将调用handleKeyRequest()进行处理
在handleKeyRequest()函数中,检测HTTP请求是否是POST请求,然后进行执行store.Set()函数
在Set函数中,创建指令,并通过raft.Apply()函数将指令交给raft,最终指令被应用到状态机
Raft将指令应用到状态机上,最终执行了applySet()函数,进行相应的设置操作
而查询操作,是通过HTTP GET请求实现了查询操作,查询流程相比较赋值流程,简单多了,只需要查询内存中的数据就可以了,不涉及到状态机
基本代码流程如下
收到这个查询的请求,首先调用handleKeyRequest()进行处理
在handleKeyRequest()函数中,检测到HTTP请求为GET请求,确认了这是赋值操作,执行store.GET()函数
GET()函数在内存中查询指定key的值
最后的删除操作,基于HTTP DELETE请求实现
整体的流程如下
收到了KV操作的请求后,系统调用handleKeyRequest()进行处理
在handleKeyRequest()函数中,检测HTTP请求为DELETE请求,确认这是一个删除操作,执行store.Delete()函数
然后通过raft.Apply()函数,将指令提交给Raft.最终指令被应用到了状态机
当前的Raft将指令应用到了状态机里面,最后会执行applyDelete()函数,删除key和值
记住,需要向Raft状态机中提交指令的操作,必须在领导者节点上执行,一般就是写操作,赋值和删除
然后需要查询最新的数据请求,也必须要在领导者上执行
分布式中,上面的操作有何变化吗
创建一个集群
实现一个Raft集群,首先创建这个集群,创建这个集群分为两步,首先是通过Bootstrap的方式启动,并作为领导者节点,启动的命令如下
$GOPATH/bin/raftdb -id node01 -haddr raft-cluster-host01:8091 -raddr raft-cluster-host01:8089 ~/.raftdb
这个时候,在Store.Open函数中,调用BootstrapCluster()函数将节点启动起来
其他的节点会通过-join参数来指定领导者的节点地址
$GOPATH/bin/raftdb -id node02 -haddr raft-cluster-host02:8091 -raddr raft-cluster-host02:8089 -join raft-cluster-host01:8091 ~/.raftdb
这样集群运行后之后,就会加入集群,然后接下来就可以正常启动了
$GOPATH/bin/raftdb -id node02 -haddr raft-cluster-host02:8091 -raddr raft-cluster-host02:8089 ~/.raftdb
写请求
如何实现写请求呢,按照之前的说法,当跟随者收到了写请求之后,拒绝执行这个请求,将领导者的地址返回给客户端,让客户端直接访问领导者
调用set()函数执行赋值操作
如果set成功,则执行步骤3,如果执行set失败,原因是因为这个节点不是领导者,那么就执行步骤4
如果出错了,但是当前节点是领导者,那么就判断后执行步骤5
赋值操作执行成功,正常返回
如果是步骤4,那么就返回包含领导者地址信息的重定向响应,并返回给客户端,然后客户端直接访问领导者节点进行赋值操作
而且步骤4,利用了http的重定向功能,在赋值的时候,如果不是领导者节点,会返回包含领导者信息的HTTP 30重定向响应给curl,这时候curl根据响应信息,重新发起赋值操作请求,访问领导者节点
最后是关于读操作,这一步比较复杂,因为需要考虑到不同级别的一致性读的问题
因为需要实现3种一致性的模型,就是stale default consisent,让用户根据场景特点,按需选择一致性的级别,如何实现呢
接收到HTTP GET的查询请求的时候,先调用level函数,来获取到对应的一致性读的级别
然后调用get来获取到值
如果执行成功了,直接返回
如果出错了,查看报错原因,如果报错原因是因为不是领导者,进行重定向到客户端,然后由客户端直接访问领导者节点查询数据
如果报错信息也不是因为不是领导者,就返回错误信息给客户端
curl -XGET raft-cluster-host02:8091/key/foo?level=consistent -L
我们总结一下
我们在实现接入协议的时候,使用了HTTP来进行实现相关的操作,我们通过GET请求实现了读,DELETE实现了删除
利用了HTTP的307重定向的相应,返回领导者的地址信息给客户端,并且让客户端直接再度发起请求
在Raft中,我们可以通过raft.VerifyLeader()来查看是否还是领导者
如何通过代码去移除一个节点呢?
那么整体流程还是按照写操作的流程
获取到store中进行执行
如果不报错,就说进行了RemoveServer()
报错了,查看报错信息,如果是因为不是领导者节点,那么直接返回客户端领导者的地址,让其进行重定向
如果是因为删除节点是领导者节点,无法删除导致的,那么直接返回客户端错误信息
(我这里认为是无法删除领导者节点的,应该由管理人员直接杀死领导者节点的进程)