我们学过了RDD的常用算子,不过这些算子都是作用于RDD上,而RDD是以数据分区为粒度,每一个分区对应一个Executor,也就是对应的局部数据
而当我们想要设置一个全局使用的变量的时候,如果不使用Spark提供的特殊原语,就只能将这个变量设置在每一个Executor中,这就导致每一个线程都会拥有这一个变量,对于这种情况,会导致占用大量的内存资源,而且在run的时候,会因为需要分发而占用大量的网络资源。为了这种大量分发,大量占用资源的情况不发生,所以我们引入了广播变量这个杀手锏
广播变量的使用,我们还是拿wordCount看下
如果不使用广播变量,代码如下
Val list: List[String] = List(“Apache”,”Spark”) WordRDD.filter(word => list.contains(word)) |
而如果是使用广播变量的话,代码则可以如下
Val list: List[String] = List(“Apache”,”Spark”) Val bc = sc.broadcast(list) Val cleanWordRDD: RDD[String] = wordRDD.filter(word => bc.value.contains(word)) |
其实就是将访问的方式从直接访问改为了访问广播变量bc.value
这种使用,能够减少内存占用和网络占用,原因是因为原本不使用广播原语,变量会存在每一个线程中,而如果使用广播原语,则是改为一个Executors,也就是JVM下,才会有一个变量存储。
通过这种方式来优化作业
除了这一种方式,还有种共享变量,累加器
这个累加器的主要作用很简单,就是用于全局计数,这样的一个变量由Driver端定义,更新是在RDD算子中调用add函数,执行完成,调用累加器的value函数,就可以获取全局计数结果,这样我们拿一个示例来看下
我们接下来仍然修改WordCount来展示其使用方式,我们接下来书写一个函数,在过滤空格的同时,记录空格的出现个数
Val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(“ ”)) Val ac = sc.longAccumulator(“Empty string”) Def f(x: String): Boolean= { If (x.equal(“”)){ Ac.add(1) Return false }else{ Return true } } Val filterRDD: RDD[String] = wordRDD.filter(f)
Ac.value |
上面代码我们最终通过value函数获取到结果集
除了基本的longAccumulator,SparkContext还提供了doubleAccumulator和collectionAccumulator等不同类型的累加器,满足不同场景下的需要
其中collectionAccumulator可以让开发者定义集合类型的累加器
这些都是定义累加器变量,然后调用add函数,从而更新累加器状态,最后调用value获取结果
总结一下,我们今天说了广播变量的使用,主要有两种,分别是基本的广播变量和累加器