9.RDD中其他的算子

之前我们说了RDD相关的转换算子,以及相关的聚合算子,现在我们说下其中涉及的剩余算子

剩余算子可以大致分为数据准备相关算子,数据预处理算子,收集算子

对于数据准备阶段,相关的算子是union和sample

union算子的主要作用是将两个类型相同,来源不同的RDD进行合并,从而形成一个统一的,更大的分布式数据集,比如如下的代码

我们有两个String类型的RDD,进行合并则是如下

Val rdd1: RDD[String]

Val rdd2: RDD[String]

Val rdd = Rdd1.union(rdd2)

 

对于多个类型一致的RDD,可以通过连续调用union来将所有的数据集进行合并。

 

其次是sample,用于对RDD进行随机采样,从一份大数据变为一份小数据。其中的参数有withReplacement,faction和seed

其中withReplacement的类型是Boolean,目的是采样是否有放回,具体是用于去重的,如果设置为true,就可能有重复数据,相反为false,就会进行去重处理

然后是faction参数,类型是Double,值是从0到1,具体是指的结果集和原集的尺寸比例,seed含义是种子,如果种子相同,那么采样的结果必然相同

Rdd.sample(false,0.1).collect

//每次采样结果不同

 

Rdd.sample(false,0.1,123).collect

//每次采样结果相同

这就是关于数据准备相关的算子了

 

其次是数据预处理相关的算子

分别是repartition和 coalesce

这两个是设置一个RDD并行度,另一个是专门用来降低RDD并行度

Repartition用于设置RDD的数据分区数量,coalesce用于降低RDD的并行度

首先我们可以通过

Rdd.partitions.length获取到RDD的并行度

然后可以通过

Rdd.repartition(8)

来设置并行度

而这个并行度,就是我们RDD的数据分区,每一个分布式Task,都需要一个CPU线程来执行

因此RDD的并行度,很大程度决定了分布式系统中CPU的使用效率,那么关于并行度的设置,一般可以考虑设置为可用CPU的2-3倍

不过需要注意,在使用了repartition之后,其会引入Shuffle操作,关于这一操作的引入很好理解,因为分区扩散之后,必然会导致重新清洗数据

如果不想要引入Shuffle,而且需求只是降低并行度,那么就可以使用coalesce函数,其使用方式和repartition一致,需要传入一个int类型的行参,来进行并行度的调整

Var rdd2 = rdd.coalesce(2)

这样就可以进行并行度的降低了

其之所以不会进行shuffle操作,主要是因为其主要进行了将同一个Executor中的不同分区进行了合并,避免了跨Executor的数据分发,从而避免了Shuffle

 

之后就是和结果收集相关的算子

基本可以分为两类

一类是将计算结果从各个Executors收集到Driver端,第二类则是将Executors持久化到文件系统

首先是第一种,将数据从Executors收集到Driver端,常见的算子有first,take,collect,

比如我们有一个RDD

Rdd.first

Rdd.take(5)

Rdd.collect

其中first用于从其中提取任意一条数据,而take则是收集多条记录,函数中传入一个int类型用来指定获取多少条数据

最后collect则是从中获取全量数据,不过需要注意一点,由于全量数据太大,全部存入Driver端,可能有OOM的风险,而且即使不导致OOM,也会导致大量的网络开销

 

而更为推荐的则是将其持久化到磁盘,使用saveAsTextFile即可

使用方式很简单,直接调用saveAsTextFile(path:String)即可,path代表的是文件系统目录,可以是本地文件系统,可以是HDFS,S3等分布式文件系统

如果使用了saveAsTextFile算子之后,可以通过Executors直接将RDD进行落盘,从而避免了和Driver端的交互

 

那么总结总结一下本章,本章我们说了很多RDD的算子,涉及了不同的生命周期阶段

数据准备阶段有union,sample进行联合和取样

数据预处理 coalesce,repartition 分别对数据的partition进行缩减以及调整

数据收集 take first collect 进行收集RDD

saveAsTextFile 进行落盘

 

发表评论

邮箱地址不会被公开。 必填项已用*标注