9.RDD中其他的算子
之前我们说了RDD相关的转换算子,以及相关的聚合算子,现在我们说下其中涉及的剩余算子
剩余算子可以大致分为数据准备相关算子,数据预处理算子,收集算子
对于数据准备阶段,相关的算子是union和sample
union算子的主要作用是将两个类型相同,来源不同的RDD进行合并,从而形成一个统一的,更大的分布式数据集,比如如下的代码
我们有两个String类型的RDD,进行合并则是如下
Val rdd1: RDD[String] Val rdd2: RDD[String] Val rdd = Rdd1.union(rdd2) |
对于多个类型一致的RDD,可以通过连续调用union来将所有的数据集进行合并。
其次是sample,用于对RDD进行随机采样,从一份大数据变为一份小数据。其中的参数有withReplacement,faction和seed
其中withReplacement的类型是Boolean,目的是采样是否有放回,具体是用于去重的,如果设置为true,就可能有重复数据,相反为false,就会进行去重处理
然后是faction参数,类型是Double,值是从0到1,具体是指的结果集和原集的尺寸比例,seed含义是种子,如果种子相同,那么采样的结果必然相同
Rdd.sample(false,0.1).collect //每次采样结果不同
Rdd.sample(false,0.1,123).collect //每次采样结果相同 |
这就是关于数据准备相关的算子了
其次是数据预处理相关的算子
分别是repartition和 coalesce
这两个是设置一个RDD并行度,另一个是专门用来降低RDD并行度
Repartition用于设置RDD的数据分区数量,coalesce用于降低RDD的并行度
首先我们可以通过
Rdd.partitions.length获取到RDD的并行度
然后可以通过
Rdd.repartition(8)
来设置并行度
而这个并行度,就是我们RDD的数据分区,每一个分布式Task,都需要一个CPU线程来执行
因此RDD的并行度,很大程度决定了分布式系统中CPU的使用效率,那么关于并行度的设置,一般可以考虑设置为可用CPU的2-3倍
不过需要注意,在使用了repartition之后,其会引入Shuffle操作,关于这一操作的引入很好理解,因为分区扩散之后,必然会导致重新清洗数据
如果不想要引入Shuffle,而且需求只是降低并行度,那么就可以使用coalesce函数,其使用方式和repartition一致,需要传入一个int类型的行参,来进行并行度的调整
Var rdd2 = rdd.coalesce(2)
这样就可以进行并行度的降低了
其之所以不会进行shuffle操作,主要是因为其主要进行了将同一个Executor中的不同分区进行了合并,避免了跨Executor的数据分发,从而避免了Shuffle
之后就是和结果收集相关的算子
基本可以分为两类
一类是将计算结果从各个Executors收集到Driver端,第二类则是将Executors持久化到文件系统
首先是第一种,将数据从Executors收集到Driver端,常见的算子有first,take,collect,
比如我们有一个RDD
Rdd.first
Rdd.take(5)
Rdd.collect
其中first用于从其中提取任意一条数据,而take则是收集多条记录,函数中传入一个int类型用来指定获取多少条数据
最后collect则是从中获取全量数据,不过需要注意一点,由于全量数据太大,全部存入Driver端,可能有OOM的风险,而且即使不导致OOM,也会导致大量的网络开销
而更为推荐的则是将其持久化到磁盘,使用saveAsTextFile即可
使用方式很简单,直接调用saveAsTextFile(path:String)即可,path代表的是文件系统目录,可以是本地文件系统,可以是HDFS,S3等分布式文件系统
如果使用了saveAsTextFile算子之后,可以通过Executors直接将RDD进行落盘,从而避免了和Driver端的交互
那么总结总结一下本章,本章我们说了很多RDD的算子,涉及了不同的生命周期阶段
数据准备阶段有union,sample进行联合和取样
数据预处理 coalesce,repartition 分别对数据的partition进行缩减以及调整
数据收集 take first collect 进行收集RDD
saveAsTextFile 进行落盘