本章,我们讨论下分布式计算的实现模型,如何将抽象的计算流图,转换为实实在在的分布式计算任务,并且以并行计算的方式进行交付执行。

在Spark种,任何一个程序的入口,都是带有SparkSession的main函数,其包罗万物。提供Spark运行时上下文的同时,为开发者提创建,转换,计算 分布式数据集的开发API

对于这样的一个main函数,称之为Drvier

Driver就负责将代码进行解析,构建计算流图,然后交付进行运行,也就是类似主脑的地位,实际负责运行的,在Spark环境中,被称为Executor

图片

大致是这样的一个架构

而在Driver进程之中,DAGScheduler,TaskScheduler,SchedulerBackend三个对象进行通力合作。

将用户的代码转换为计算流图

根据计算流图拆分出分布式任务

然后将分布式任务分发到Executors中

接收到任务之后,Executors调用内部的线程池,执行任务代码,对于一个完整的RDD,每个Executors负责处理其中的一个数据分片子集,进行处理

之后,我们拿上面的CountWord作为例子,看下如何形成的执行过程

首先我们在执行的时候,需要先进入spark-shell交互式环境

Spark-shell有很多命令行参数,其中最为重要的有两类,一类是指定部署master,一类指定任务所需的资源。

然后默认的进入模式,指定master部署模式为local,第二个是部署规模,默认是按照CPU个数启动的,假设有4个CPU,敲入spark-shell的时候,会启动1个Driver进程,3个Executors进程。

然后我们输入了不同的计算原语,直到输入take这个Action算子的时候,会触发之前构建好的计算流图。划分不同的任务,分发给不同的Executors

那么,如何划分不同的任务呢? 是根据的Shuffle任务为边界进行的划分

也就是洗牌任务,这种任务,对应到我们之前的Count Word任务中,就是reduceByKey算子,就比如一个单词spark,在之前可能散落在不同的Executor中,我们需要将不同的Executor进程中的单词,汇总到一个Executor中,这就是Shuffle

那么根据CountWord的代码,大致可以分为两个部分,一部分是textFile,flatMap,filter,map

组合成一份任务,然后另一份是reduceByKey,确定了这两个任务之后,我们就可以将其发给对应的Executors进行执行,整体的流程参考如下

图片

这样,我们就在本地部署的环境中进行了运行,不过一般的Spark代码运行,都是依赖于集群环境,而不是一个单机环境,而Spark集群支持运行在Standalone,YARN,Mesos,Kubernetes。

这里我们也简单说下如何进行Standalone模式下的Spark集群搭建。

首先准备三个机器,并配置对应的SSH互联

图片

然后安装spark

图片

之后启动Master和Worker节点

图片

这样就启动了一个Standalone的集群运行Spark任务了。

之后我们更加详细的讲述下分布式计算中的整体架构

//插入昨晚写的

首先是核心组件DAGScheduler

负责将形成的计算图DAG拆分为执行Stages,并且将每一个Stage转换为TaskSets,

而在TaskSets中,包含了多个Task,也就是多个分布式任务,每一个Task中包含的属性主要如下

图片

上面的属性中,stageId和stageAttempld标记了Task和Stage的对应关系

TaskBinary封装了用户代码,partition则是对应的RDD数据分区,locs则是用于计算任务倾向的计算节点。

那么关于一些Task的细节我们最后再说,先来总结下DAGScheduler的主要职责

首先根据用户代码构建DAG

然后以Shuffle 为边界进行Stages的切割

基于Stages创建TaskSets,并且将TaskSets交为TaskScheduler进行调度

那么DAGScheduler讲完了

我们就可以看看TaskScheduler的功能了,不过TaskScheduler在获取到了TaskSets之后,需要先去找SchedulerBackend获取执行资源

我们就先看下SchedulerBackend这个组件的功能

其内部维护了个名为ExecutorDataMap的数据机构,以Executor节点为Key,保存了每个节点的资源状态。其中包含了Executor的RPC地址,主机地址,可用CPU核数和CPU总数。

这样,在TaskScheduler请求资源的都是,就可以返回给TaskScheduler多个WorkOffer了,其中每个WorkOffer都封装了ExecutorID, 主机地址,CPU核数方便TaskScheduler用于任务调度

至于SchedulerBackend如何维护这个ExecutorDataMap,则是通过了ExecutorBackend保持周期性通信,ExecutorBackend存在于每一个节点上,来进行消息的互通有无,记录资源变更

然后我们回到TaskScheduler,在获取到多个WorkerOffer之后,TaskScheduler会将Task分配到不同的节点上

那么如何进行分配的,则是根据每一个Task中存在的locs字段,也就是倾向性来决定的。

由于 移动数据不如移动计算的概念,所以locs中保存了本地倾向性来进行调度

比如我们数据存放在node0,node1节点,那么我们在设置读取Task的时候更倾向于放在node0,node1节点上,这种节点倾向称为 NODE_LOCAL

除了这种节点调度,还有着PROCESS_LOCAL、RACK_LOCAL 和 ANY。

上面的调度规则的优先级则是 PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY

按照优先级进行调度,确认分配节点

图片

确认节点后,发往对应节点上的ExecutorBackend进行执行。

而Executor Backend除了负责协助SchedulerBackend来维护各个节点的资源状态

还负责实际的执行操作,负责实际执行-Task

每当Task处理完成,ExecutorBackend还会向SchedulerBackend 发送StatusUpdate事件,告知Task的执行状态,最终将状态汇报给DAGScheduler

从而让DAGScheduler知道自己完成了某一个Stage的任务调度。

而在DAGScheduler中,因为存在两种Stage,ResultStage和ShuffleMapStage

ExecutorBackend上报的状态更新到了TaskScheduler,其会放在LinkedBlockingDequeu队列中,让DAGScheduler进行消费,

DAGScheduler会标记这个Task对应的Executor完成,然后检查对应的Stage是否还有没处理的Task,如果没有则说明Stage完成,继续等待后续Stage完成。

发表评论

邮箱地址不会被公开。 必填项已用*标注