我们学过了RDD的常用算子,不过这些算子都是作用于RDD上,而RDD是以数据分区为粒度,每一个分区对应一个Executor,也就是对应的局部数据

而当我们想要设置一个全局使用的变量的时候,如果不使用Spark提供的特殊原语,就只能将这个变量设置在每一个Executor中,这就导致每一个线程都会拥有这一个变量,对于这种情况,会导致占用大量的内存资源,而且在run的时候,会因为需要分发而占用大量的网络资源。为了这种大量分发,大量占用资源的情况不发生,所以我们引入了广播变量这个杀手锏

 

广播变量的使用,我们还是拿wordCount看下

如果不使用广播变量,代码如下

Val list: List[String] = List(“Apache”,”Spark”)

WordRDD.filter(word => list.contains(word))

 

而如果是使用广播变量的话,代码则可以如下

Val list: List[String] = List(“Apache”,”Spark”)

Val bc = sc.broadcast(list)

Val cleanWordRDD: RDD[String] = wordRDD.filter(word => bc.value.contains(word))

 

其实就是将访问的方式从直接访问改为了访问广播变量bc.value

这种使用,能够减少内存占用和网络占用,原因是因为原本不使用广播原语,变量会存在每一个线程中,而如果使用广播原语,则是改为一个Executors,也就是JVM下,才会有一个变量存储。

通过这种方式来优化作业

 

除了这一种方式,还有种共享变量,累加器

这个累加器的主要作用很简单,就是用于全局计数,这样的一个变量由Driver端定义,更新是在RDD算子中调用add函数,执行完成,调用累加器的value函数,就可以获取全局计数结果,这样我们拿一个示例来看下

我们接下来仍然修改WordCount来展示其使用方式,我们接下来书写一个函数,在过滤空格的同时,记录空格的出现个数

Val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(“ ”))

Val ac = sc.longAccumulator(“Empty string”)

Def f(x: String): Boolean= {

  If (x.equal(“”)){

   Ac.add(1)

 Return false

}else{

 Return true

}

}

Val filterRDD: RDD[String] = wordRDD.filter(f)

 

Ac.value

上面代码我们最终通过value函数获取到结果集

除了基本的longAccumulator,SparkContext还提供了doubleAccumulator和collectionAccumulator等不同类型的累加器,满足不同场景下的需要

其中collectionAccumulator可以让开发者定义集合类型的累加器

 

这些都是定义累加器变量,然后调用add函数,从而更新累加器状态,最后调用value获取结果

 

总结一下,我们今天说了广播变量的使用,主要有两种,分别是基本的广播变量和累加器

 

发表评论

邮箱地址不会被公开。 必填项已用*标注