这一次我们从代码使用的角度进行学习Spark的相关知识,这一章主要是从运行一个类似Hello World的代码开始,学习Spark的最基本使用。
在Spark上,类似Hello World的代码叫做Word Count,是先对输入文件中单词进行计数,然后打印频率最高的5个单词
那么我们直接看下如何在Spark 中书写并使用这样的Word Count
首先我们需要创建一个相关的运行环境
首先是在本地创建部署一个Spark运行环境,整体流程只需要三个步骤,就可以完成Spark的本地部署。
1. 下载安装包: 从Spark官网下载安装包
2. 解压:解压到本地
3. 配置环境变量:配置解压路径/bin文件 到 PATH环境变量
这样配置之后,打开命令行终端输出 spark-shell –version,如果这个命令可以输出Spark版本好,就表示大功告成了。
其次这个环境运行时,需要依赖于Java和Scala环境。这就需要提前预装相关环境
然后我们就可以书写Word Count相关计算逻辑了
首先是输入源,文件的部分内容如下
我们以Line为单位进行的存储
那么我们首先就需要以行为单位提取文本,关于这一部分的代码如下
var rootPaht: String = _
var file: String = s”${rootPath}/wikiOfSpark.txt”
var lineRDD: RDD[String] = spark.sparkContext.textFile(file)
上面的代码是由scala书写的,这是因为Spark支持多种语言的书写方式,而且执行性能一致,都是根据不同语言形成一致的执行计划,无论是使用Python 还是 Java都有相同的执行性能。
那么回到代码,其中重要的是第三行代码,上面spark和sparkContext这两个对象的来源比较好玩
spark代指 SparkSession实例,会交给系统自动创建。
sparkContext是开发入口SparkContext实例。
然后我们创建了一个RDD对象,RDD指的是弹性分布式数据集,定义了一系列分布式数据的基本属性和处理方法,这个接下来会讲。
其次是讲这个RDD对象中存储的每一行文本转换为单词,这一步可以理解为文本的展开
val workRDD: RDD[String] = lineRDD.flatMap(line => line.split (” “))
这一步根据空格对行元素进行了拆分,拆分后得到的RDD中存储的就是以单词为单位的RDD
然后我们将单词进行过滤后转换为另一格式对象 (Key,1)
val kvRDD = workRDD.filter(word => !wrod.equals(“”)).map(word => (word,1))
这样就进行了过滤,去掉了为空的单词,并且将剩下的单词转换为kv键值对
之后我们就需要根据这个kv键值对进行聚合了,根据key,也就是单词,来统计出现的次数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
获取到了reduce之后的RDD
此时RDD中存储的还是kv键值对
不过我们已经获取到了每个单词的出现次数,最后需要按照词频进行排序,将最高的5个单词打印到屏幕上,代码如下
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
这样就完成了一个spark wordcount的代码书写
对应的python版本如下
textFile = SparkContext().textFile(“./wikiOfSpark.txt”) wordCount = ( textFile.flatMap(lambda line: line.split(” “)) .filter(lambda word: word != “”) .map(lambda word: (word, 1)) .reduceByKey(lambda x, y: x + y) .sortBy(lambda x: x[1], False) .take(5) ) print(wordCount) |