我们进行实践一个CSI插件的编写过程

为了简单的编写,我们使用了DigitalOcean的块存储 Block Storage服务,作为实践的对象

DigitalOcean提供的功能较少,但是方便实践

这次编写的CSI插件的功能,就是可以运行在DigitalOcean上的Kubernetes集群能够使用对应的块存储服务,作为容器的持久化存储

在DigitalOcean上部署一个Kubernetes集群不难,也可以现在DigitalOcean上创建几个虚拟机

假设对应的CSI插件已经完成了,那么我们使用这个持久化存储的方式就很简单了,只需要创建一个如下所示的StorageClass对象

kind: StorageClass

apiVersion: storage.k8s.io/v1

metadata:

name: do-block-storage

namespace: kube-system

annotations:

storageclass.kubernetes.io/is-default-class: “true”

#上面的default-class为true的意思,是使用StorageClass作为默认的持久化存储的提供者

provisioner: com.digitalocean.csi.dobs

有了这个StorageClass External Provisioner就会为集群中新出现的PVC自动创建出PV,然后调用CSI插件创建出这个PV对应的Volume,这就是CSI体系中Dynamic Provisioning的实现方式

这个StorageClass中唯一引人注意的,是provisioner=com,digitalocean.csi.dobs这个字段,这个字段可以告诉Kubernetes,请使用名叫com.digitalocean.csi.dobs的CSI插件来为我处理这个StorageClass相关的所有操作

那么Kubernetes如何知道一个CSI插件的名字的呢?

这就需要从CSI插件的第一个服务CSI Identity说了

先看看一个CSI插件的代码结构,如下所示

tree $GOPATH/src/github.com/digitalocean/csi-digitalocean/driver

$GOPATH/src/github.com/digitalocean/csi-digitalocean/driver

├── controller.go

├── driver.go

├── identity.go

├── mounter.go

└── node.go

CSI Identity服务实现,就定义在了identity.go文件中

为了能够让Kubernetes访问到CSI Identity服务,现在driver.go文件中,定义一个标准的gRPC Server

// Run starts the CSI plugin by communication over the given endpoint

func (d *Driver) Run() error {

listener, err := net.Listen(u.Scheme, addr)

d.srv = grpc.NewServer(grpc.UnaryInterceptor(errHandler))

csi.RegisterIdentityServer(d.srv, d)

csi.RegisterControllerServer(d.srv, d)

csi.RegisterNodeServer(d.srv, d)

d.ready = true // we’re now ready to go!

return d.srv.Serve(listener)

}

这个gRPC Server注册给了CSI,就可以响应来自External Components的CSI请求了

CSI Identity服务中,最重要的接口是GetPluginInfo,返回就是这个插件的名字和版本号

func (d *Driver) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {

resp := &csi.GetPluginInfoResponse{

Name:          driverName,

VendorVersion: version,

}

}

在本例中 driverName的值,就是com.digitalocean.csi.dobs Kubernetes正是通过GetPluginInfo的返回值,来找StorageClass中声明要使用的CSI的

在上篇中,还说了,这个文件,除了GetPluginInfo之外,还包含GetPluginCapabilities接口,也很重要,这个接口返回的是CSI插件的能力

如果,编写的CSI插件不准备实现Provision阶段和Attach阶段时候,就可以通过偶这个这个接口返回不支持对应的Controller服务,没有csi.PluginCapability_Service_CONTROLLER_SERVICE这个能力,这样就能知道了

最后CSI Identity插件还提供了一个Probe接口,方便Kuberenetes通过这个接口检查CSI插件是否正常工作

然后我们,可以编写第二个服务类,CSI Controller服务了,实现的代码,在controller,go文件中

这个服务主要实现的就是Volume管理流程中的Provision阶段和Attach阶段

Provision阶段对应的口是CreateVolume和DeleteVolueme,调用者是External Provisoner,以CreateVolume为例,主要的逻辑如下

func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {

volumeReq := &godo.VolumeCreateRequest{

Region:        d.region,

Name:          volumeName,

Description:   createdByDO,

SizeGigaBytes: size / GB,

}

vol, _, err := d.doClient.Storage.CreateVolume(ctx, volumeReq)

resp := &csi.CreateVolumeResponse{

Volume: &csi.Volume{

Id:            vol.ID,

CapacityBytes: size,

AccessibleTopology: []*csi.Topology{

{

Segments: map[string]string{

“region”: d.region,

},

},

},

},

}

return resp, nil

}

createVolume需要的操作,就是调用DigtalOcean块存储服务的API,创建出一个存储卷,如果是其他类型的块存储,也可以类似的方式调用创建API

Attach阶段对应的接口是ControllerPublishVolume和ControllerUnpublishVolume,调用者是External Attacher,以ControllerPublishVolume为例,逻辑如下

func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {

dropletID, err := strconv.Atoi(req.NodeId)

// check if volume exist before trying to attach it

_, resp, err := d.doClient.Storage.GetVolume(ctx, req.VolumeId)

// check if droplet exist before trying to attach the volume to the droplet

_, resp, err = d.doClient.Droplets.Get(ctx, dropletID)

action, resp, err := d.doClient.StorageActions.Attach(ctx, req.VolumeId, dropletID)

if action != nil {

ll.Info(“waiting until volume is attached”)

if err := d.waitAction(ctx, req.VolumeId, action.ID); err != nil {

return nil, err

}

}

ll.Info(“volume is attached”)

return &csi.ControllerPublishVolumeResponse{}, nil

}

对于DigtalOcean来说,ControllerPublishVolume在Attach阶段,就是调用DigitalOcean的API,将前面创建的数据卷,挂载到指定的虚拟机上 d.doClient.StorageActions.Attach

其中存储卷由请求的VolumeId来指定,而虚拟机,就是要运行Pod的宿主机,则由请求中的NodeId来指定,这些参数,就是External Attacher发起请求时候给出的

然后External Attacher的原理,就是监听Watch一个名为VolumeAttachment的API对象,这个对象的主要字段如下

// VolumeAttachmentSpec is the specification of a VolumeAttachment request.

type VolumeAttachmentSpec struct {

// Attacher indicates the name of the volume driver that MUST handle this

// request. This is the name returned by GetPluginName().

Attacher string

// Source represents the volume that should be attached.

Source VolumeAttachmentSource

// The node that the volume should be attached to.

NodeName string

}

这个对象的生命周期,就是由AttachDetachController负责管理中的相关内容,在这个kube Controller manger负责维护的循环中,就是不断的检查Pod对应的PV,并决定是否需要对这个PV进行attach或者deattch,然后创建出了一个VolumeAttachment对象,这个对象包含了Attach所需的PV名字,宿主机的名字,存储插件的名字,当这个对象出现的时候,就可以使用这个对象中的字段,封装为一个gRPC请求调用CSI Controller的ControllerPublishVolume方法

最后就是CSI的Node服务

CSI Node服务对应的,是Volume管理流程的Mount阶段,代码实现,在node.go文件中

这个服务,主要就是利用控制循环,循环的调用CSI Node服务完成Volume的Mount阶段

这个Mount阶段,被细分为了NodeStageVolume和NodePublishVolume这两个接口,同样,在上层调用者VolumeMangerReconciler控制循环中,这两部操作还有别名,分别为MountDevice和SetUp,其中MountDevice操作,就是直接调用了CSI Node服务中的NodeStageVolume接口,这个接口,就是格式化了Volume在宿主机上对应的存储设备,然后进行了挂载到一个临时目录中,

对于DigitalOcean来说,NodeStageVolume接口的实现如下

func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {

vol, resp, err := d.doClient.Storage.GetVolume(ctx, req.VolumeId)

source := getDiskSource(vol.Name)

target := req.StagingTargetPath

if !formatted {

ll.Info(“formatting the volume for staging”)

if err := d.mounter.Format(source, fsType); err != nil {

return nil, status.Error(codes.Internal, err.Error())

}

} else {

ll.Info(“source device is already formatted”)

}

if !mounted {

if err := d.mounter.Mount(source, target, fsType, options…); err != nil {

return nil, status.Error(codes.Internal, err.Error())

}

} else {

ll.Info(“source device is already mounted to the target path”)

}

return &csi.NodeStageVolumeResponse{}, nil

}

在NodeStageVolume实现中,我们通过DigitalOcean的API获取到这Volume对应的设备路径getDiskSource,然后将这个设备格式化成指定的格式 d.mounter.Format,最后,将格式化后的设备挂载到一个临时的Staging目录下 stagingTargetPath下

而SetUp操作会调用CSI Node服务的NodePublishVolume接口,对上面设备进行接下来的处理,就是绑定操作

func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {

source := req.StagingTargetPath

target := req.TargetPath

mnt := req.VolumeCapability.GetMount()

options := mnt.MountFlag

if !mounted {

ll.Info(“mounting the volume”)

if err := d.mounter.Mount(source, target, fsType, options…); err != nil {

return nil, status.Error(codes.Internal, err.Error())

}

} else {

ll.Info(“volume is already mounted”)

}

return &csi.NodePublishVolumeResponse{}, nil

}

在上面,将Staging目录,绑定挂载到了Volume对应的宿主机目录上

Staging目录,正是Volume对应的设备被格式化后在宿主机上的位置,所以当其和Volume挂载之后,这个Volume宿主机目录的持久化处理就完成了

当然,对于文件系统类型的存储服务来说,比如NFS或者GlusterFS,并不需要进行格式化后挂载,而可以直接进行相关的挂载,所以可以跳过MountDevice操作,直接执行SetUp操作,也不需要实现NodeStageVolume接口了

编写完了对应的CSI插件,直接进行部署吧

我们需要先创建一个DigitalOcean client授权使用的Secret对象

apiVersion: v1

kind: Secret

metadata:

name: digitalocean

namespace: kube-system

stringData:

access-token: “a05dd2f26b9b9ac2asdas__REPLACE_ME____123cb5d1ec17513e06da”

然后部署CSI插件

$ kubectl apply -f https://raw.githubusercontent.com/digitalocean/csi-digitalocean/master/deploy/kubernetes/releases/csi-digitalocean-v0.2.0.yaml

插件的YAML如下

kind: DaemonSet

apiVersion: apps/v1beta2

metadata:

name: csi-do-node

namespace: kube-system

spec:

selector:

matchLabels:

app: csi-do-node

template:

metadata:

labels:

app: csi-do-node

role: csi-do

spec:

serviceAccount: csi-do-node-sa

hostNetwork: true

containers:

– name: driver-registrar

image: quay.io/k8scsi/driver-registrar:v0.3.0

– name: csi-do-plugin

image: digitalocean/do-csi-plugin:v0.2.0

args :

– “–endpoint=$(CSI_ENDPOINT)”

– “–token=$(DIGITALOCEAN_ACCESS_TOKEN)”

– “–url=$(DIGITALOCEAN_API_URL)”

env:

– name: CSI_ENDPOINT

value: unix:///csi/csi.sock

– name: DIGITALOCEAN_API_URL

value: https://api.digitalocean.com/

– name: DIGITALOCEAN_ACCESS_TOKEN

valueFrom:

secretKeyRef:

name: digitalocean

key: access-token

imagePullPolicy: “Always”

securityContext:

privileged: true

capabilities:

add: [“SYS_ADMIN”]

allowPrivilegeEscalation: true

volumeMounts:

– name: plugin-dir

mountPath: /csi

– name: pods-mount-dir

mountPath: /var/lib/kubelet

mountPropagation: “Bidirectional”

– name: device-dir

mountPath: /dev

volumes:

– name: plugin-dir

hostPath:

path: /var/lib/kubelet/plugins/com.digitalocean.csi.dobs

type: DirectoryOrCreate

– name: pods-mount-dir

hostPath:

path: /var/lib/kubelet

type: Directory

– name: device-dir

hostPath:

path: /dev

kind: StatefulSet

apiVersion: apps/v1beta1

metadata:

name: csi-do-controller

namespace: kube-system

spec:

serviceName: “csi-do”

replicas: 1

template:

metadata:

labels:

app: csi-do-controller

role: csi-do

spec:

serviceAccount: csi-do-controller-sa

containers:

– name: csi-provisioner

image: quay.io/k8scsi/csi-provisioner:v0.3.0

– name: csi-attacher

image: quay.io/k8scsi/csi-attacher:v0.3.0

– name: csi-do-plugin

image: digitalocean/do-csi-plugin:v0.2.0

args :

– “–endpoint=$(CSI_ENDPOINT)”

– “–token=$(DIGITALOCEAN_ACCESS_TOKEN)”

– “–url=$(DIGITALOCEAN_API_URL)”

env:

– name: CSI_ENDPOINT

value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock

– name: DIGITALOCEAN_API_URL

value: https://api.digitalocean.com/

– name: DIGITALOCEAN_ACCESS_TOKEN

valueFrom:

secretKeyRef:

name: digitalocean

key: access-token

imagePullPolicy: “Always”

volumeMounts:

– name: socket-dir

mountPath: /var/lib/csi/sockets/pluginproxy/

volumes:

– name: socket-dir

emptyDir: {}

我们通过DaemonSet在每一个节点上都启动一个CSI插件,为kubelet提供CSI Node服务,每个节点上都需要被kubelet提供CSI Node,这样CSI Node就可以直接调用,和kubelet一对一的部署起来

而且还运行了一个driver-registrara组件,以sidecar的方式运行这个组件,可以向kubelet注册这个CSI插件,通过访问同一个Pod里的CSI插件容器的Identity服务获取到

然后因为CSI插件运行在一个容器中,那么CSI Node服务在MoUnt阶段的挂载操作,一般是挂载了容器内部的Mount Namespace里了,为了可以实际挂载到宿主机上,所以我们需要将宿主机上的/var/lib/kubelet以Volume的方式挂载到CSI插件容器的同名目录下了,然后设置这个Volume的mountPropageation=Bidirectional,开启双向挂载传播,将这个容器在这个目录下进行的挂载操作,传播给宿主机

然后通过StatefulSet在任意节点上启动一个CSI插件,为External Components提供CSI Controller服务,作为CSI Controller服务的调用者,External Provisioner和External Attacher两个外部组件,就需要以sidecar的方式和这次的CSI插件定义在一个Pod中

我们使用StatefulSet是因为可以严格确保应用拓补的稳定性,对Pod更新是确保上一个已经Pod并删除后,才会插件并启动下一个Pod,这样的顺序确保是非常重要的

然后,只需要定义一个使用这个插件的StorageClass就可以了

kind: StorageClass

apiVersion: storage.k8s.io/v1

metadata:

name: do-block-storage

namespace: kube-system

annotations:

storageclass.kubernetes.io/is-default-class: “true”

provisioner: com.digitalocean.csi.dobs

如一开头的StorageClass一样

那么在本章中,我们说了如何编写一个CSI插件我们可以借助此来了解Kubernetes持久化的存储体系

当用户创建了一个PVC之后,之前部署的StatefulSet的External Provisioner容器,就会监听到这个PVC的创建,并且利用CSI Controller的CreateVolume,创建出对应的PV

运行在Kubernetes Master节点上Volume Controller,就会通过PersistentVolumeController,发现这对新创建的PV和PVC使用的是相同的StorageClass,就会将一对PV和PVC进行绑定,使得PVC进入Bound状态

全局的Volume Controller通过AttachDetachController控制循环创建一个VolumeAttachment对象,这个对象携带了宿主机A和待处理Volume名字

然后再交给我们进行Attach和Mount过程,我们的CSI Controller会监听到这个VolumeAttachment对象,然后使用这个对象的宿主机和Volume名字,调用同一个Pod里相同的CSI插件的CSI Controller服务的ControllerPublishVolume方法,完成这个Attach阶段

上述过程完成后,运行在节点上的kubelet,就会通过VolumeMangerReconciler控制循环,发现当前宿主机上有一个Volume对应的设备磁盘,已经被Attach到某个设备目录下,于是kubelet就会调用同一个宿主下的CSI Node服务的NodeStageVolume和NodePublishVolume,完成这个Volume的Mount阶段

这样,便是一个完整的持久化Volume的创建和挂载流程

那么我们需要思考一个问题,什么时候使用FlexVolume,什么时候使用CSI

简单来说,是使用场景决定的,如果使用简单,而且CSI插件包含了一部分原来Kuberentes中存储管理的功能,实现,部署比较复杂,场景比较简单,不需要Dynamic Provisioning,可以使用flexVolume,场景复杂,支持Dynamci

发表评论

邮箱地址不会被公开。 必填项已用*标注