这一次我们从代码使用的角度进行学习Spark的相关知识,这一章主要是从运行一个类似Hello World的代码开始,学习Spark的最基本使用。

在Spark上,类似Hello World的代码叫做Word Count,是先对输入文件中单词进行计数,然后打印频率最高的5个单词

那么我们直接看下如何在Spark 中书写并使用这样的Word Count

 

首先我们需要创建一个相关的运行环境

首先是在本地创建部署一个Spark运行环境,整体流程只需要三个步骤,就可以完成Spark的本地部署。

1. 下载安装包: 从Spark官网下载安装包

2. 解压:解压到本地

3. 配置环境变量:配置解压路径/bin文件 到 PATH环境变量

 

这样配置之后,打开命令行终端输出 spark-shell –version,如果这个命令可以输出Spark版本好,就表示大功告成了。

其次这个环境运行时,需要依赖于Java和Scala环境。这就需要提前预装相关环境

 

然后我们就可以书写Word Count相关计算逻辑了

首先是输入源,文件的部分内容如下

图片

我们以Line为单位进行的存储

那么我们首先就需要以行为单位提取文本,关于这一部分的代码如下

var rootPaht: String = _

var file: String = s”${rootPath}/wikiOfSpark.txt”

var lineRDD: RDD[String] = spark.sparkContext.textFile(file)

上面的代码是由scala书写的,这是因为Spark支持多种语言的书写方式,而且执行性能一致,都是根据不同语言形成一致的执行计划,无论是使用Python 还是 Java都有相同的执行性能。

 

那么回到代码,其中重要的是第三行代码,上面spark和sparkContext这两个对象的来源比较好玩

spark代指 SparkSession实例,会交给系统自动创建。

sparkContext是开发入口SparkContext实例。

 

 

然后我们创建了一个RDD对象,RDD指的是弹性分布式数据集,定义了一系列分布式数据的基本属性和处理方法,这个接下来会讲。

其次是讲这个RDD对象中存储的每一行文本转换为单词,这一步可以理解为文本的展开

val workRDD: RDD[String] = lineRDD.flatMap(line => line.split (” “))

这一步根据空格对行元素进行了拆分,拆分后得到的RDD中存储的就是以单词为单位的RDD

 

然后我们将单词进行过滤后转换为另一格式对象 (Key,1)

val kvRDD = workRDD.filter(word => !wrod.equals(“”)).map(word => (word,1))

这样就进行了过滤,去掉了为空的单词,并且将剩下的单词转换为kv键值对

之后我们就需要根据这个kv键值对进行聚合了,根据key,也就是单词,来统计出现的次数

val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

获取到了reduce之后的RDD

此时RDD中存储的还是kv键值对

不过我们已经获取到了每个单词的出现次数,最后需要按照词频进行排序,将最高的5个单词打印到屏幕上,代码如下

wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

这样就完成了一个spark wordcount的代码书写

对应的python版本如下

textFile = SparkContext().textFile(“./wikiOfSpark.txt”)

wordCount = (

    textFile.flatMap(lambda line: line.split(” “))

    .filter(lambda word: word != “”)

    .map(lambda word: (word, 1))

    .reduceByKey(lambda x, y: x + y)

    .sortBy(lambda x: x[1], False)

    .take(5)

)

print(wordCount)

 

 

发表评论

邮箱地址不会被公开。 必填项已用*标注