之前我们说了一些和聚合不相关的转换算子,比如map,mapPartitions,faltMap,filter

这次我们需要讲一下在Spark中和数据聚合相关算子,比如groupByKey,reduceByKey,aggregateByKey和sortByKey,而这些算子都涉及到Shuffle计算,也是Spark中比较重逻辑的一部分。

而且这几个算子,都是面对Paired RDD的,从而进行RDD内数据聚合

那么我们这次就说下这几个算子的使用

1.     groupByKey,按组收集

对于PairedRDD,会先按照Key进行分组,然后把相关的Value值,以集合的形式收集到一起。也就是将RDD[(Key,Value)]转换为RDD[(Key,Value集合)]

使用起来也并不困难,我们拿WordCount来举例

val kvRDD: RDD[(String,Iterable[String])] = kvRDD.groupByKey()

从上面可以看出groupByKey使用并不困难,但是groupByKey存在着性能问题,这是由于groupByKey会将数据进行全量搜集,并在Shuffle后发送到相同的数据分区,这就导致数据量大的时候,会产生大量的磁盘IO或者网络IO,从而严重影响作业的执行性能。

不过一般groupByKey的使用频率不高,而是采用其他的聚合算子,那么我们就看看这些常用的聚合算子

2.     reduceByKey 分组聚合

也就是按照Key值进行聚合,将Key相同的元素,聚合成一个元素

对应的代码可以如下参考

val wordCounts: RDD[(String,Int)] = kvRDD.reduceByKey((x:Int,y:Int) -> x + y)

上面的代码可以理解为

将PairedRDD的相同Key数据的Value进行了相加

而且我们在上面使用的是匿名算子,除了匿名算子,我们可以声明相关的具体函数,比如我们需要提取同一个Key的最大值,就比如如下的函数

def f(x:Int, y:Int): Int = {

return math.max(x,y)

}

然后直接reduceByKey(f)即可

图片

而且在使用过程中,由于Map端和Reduce段使用的聚合逻辑是一致的,是由函数f定义的,所以会进行一些优化,也就是在Map端的时候,就已经进行了预聚合,然后在reduce端进行二度聚合即可

由于这种优化的存在,在进行执行的时候,往往执行效率能至少提升一倍

不过reduceByKey存在的局限性是Map阶段和Reduce阶段的聚合逻辑要保持一致

如果我们希望Map阶段和Reduce阶段的逻辑不一致的话,可以考虑下面的aggregateByKey算子了

3.     aggregateByKey 更加灵活的算子

聚合算子的参数比较多,需要提供三个参数,分别是一个初始值,一个Map端聚合函数f1,一个reduce端聚合函数f2

基本代码如下

rdd.aggregateByKey(初始值,f1,f2)

其中需要注意,f1的参数类型和f2要保持一致,而且f1的行参要和Paired RDD的value类型一致,初始值的类型也要和Paired RDD的value类型一致

具体的代码可以参考如下

def f1(x:Int, y:Int): Int = {

return math.max(x,y)

}

def f1(x:Int, y:Int): Int = {

return x+y

}

val wordCounts: RDD[(String,Int)] = rdd.aggregateByKey(0)(f1,f2)

这样就是在利用了和reduceByKey一样的优化,并且兼容了两种不同的聚合函数

最后则是sortByKey,字面意思是按照Key进行排序,使用方法也很简单,只需要在RDD上调用sortByKey()即可

然后如果需要按照升序或者倒序进行排列的话,只需要传入true或者false即可

如果按照降序进行排序的话,只需要传入false

那么我们总结一下今天说的几个算子,分别是groupByKey,reduceByKey,aggregateByKey和sortByKey

大致说明了这几个算子的用法和性能优化的点

发表评论

邮箱地址不会被公开。 必填项已用*标注