我们基本上说完了Spark的常用算子和一些架构概念,可以面对一些开发任务了,但是Spark强大之处在于其还有很多更高层的计算框架,比如Spark SQL,Structured Streaming和Spark MLlib的常规开发方法,来面对不同的数据应用场景

那么,我们就首先从Spark SQL开始进行讲解

这次我们的示例是拿的北京市小轿车摇号来进行的讲解

目标是统计参与摇号的司机朋友,真正摇到号的时候的摇号倍率

究竟是在8倍的时候摇到号了?还是10倍的时候?

首先我们呢准备了一些数据,从11年到19年的摇号数据,其中目录结构如下

图片

分为了apply和lucky两个目录,apply目录是所有申请号码,lucky则是所有中签号码

首先我们要做这样的一个需求,必须要将数据读入到Spark中,而在Spark SQL中,我们一般使用DataFrame进行数据读取

import org.apache.spark.sql.DataFrame

val rootPath: String = _

// 申请者数据

val hdfs_path_apply: String = s”${rootPath}/apply”

// spark是spark-shell中默认的SparkSession实例

// 通过read API读取源文件

val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)

// 数据打印

applyNumbersDF.show

// 中签者数据

val hdfs_path_lucky: String = s”${rootPath}/lucky”

// 通过read API读取源文件

val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)

// 数据打印

luckyDogsDF.show

我们分别读入了apply和lucky的数据,利用的SparkSession的read API进行的读取,读取之后形成了一个DataFrame,最后利用DataFrame的show,完成了数据展示

其中SparkSession是Spark在2.0之后提供的新一代接口,SparkContext通过textFile读取文件,而SparkSession则是通过read API将数据源转换为DataFrame

而这个DataFrame,比起RDD,则带有着Schema,可以简单的看作是数据库中的一张二维表

而我们show出来的结果就可以应验我们上面的说法

图片

无论是哪个show,得到的结果都会是包含两个字段carNum,batchNum

carNum是申请号码,而batchNUm代表着摇号批次

那么有了这个样本之后,我们回到我们最初的需求,也就是计算中签率和中签时间的关系

也就是查看抽签者在什么倍率下抽到了签

那么首先第一步,我们要过滤16年之前的中签数据,因为那时候中签规则还没实施

// 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段

val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col(“batchNum”) >= “201601”)

// 摇号数据与中签数据做内关联,Join Key为中签号码carNum

val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq(“carNum”), “inner”)

首先我们对luckDogsDF进行过滤,然后进行join

我们调用了join算子,完成两个算子的关联,join算子有三个参数,分别是另一个DataFrame,关联的字段,关联的形式,譬如inner作为内关联,left代表左关联

然后我们计算每个用户在摇到号的时候,是多少倍倍率了

也就是group之后count

val multipliers: DataFrame = jointDF.groupBy(col(“batchNum”),col(“carNum”)) .agg(count(lit(1)).alias(“multiplier”))

通过groupBy来聚合号码,然后利用agg和count算子来聚合出现的次数,然后使用alias来将聚合后的数据命名为multiplier

图片

不过这样我们获得数据,有着不同倍率下的数据,对于同一个号码,我们只需要保留一个最大的倍率,这样我们就需要对上面的数据进行聚合,获取最大的multiplier

val uniqueMultipliers: DataFrame = multipliers.groupBy(“carNum”)..agg(max(“multiplier”).alias(“multiplier”))

这里我们groupBy carNum,然后还用agg和max算子来保留倍率最大值

图片

最后我们统计不同倍率下中签的人数

val result: DataFrame = uniqueMultipliers.groupBy(“multiplier”) .agg(count(lit(1)).alias(“cnt”))

.orderBy(“multiplier”)

这里我们那multiplier进行了聚合,然后统计了不同倍率下的人数

获取到了最后的数据集

那么总结一下,我们学习了Spark SQL,通过一个简单的示例,讲解了如何读取,以及上面存在的转换算子。

发表评论

邮箱地址不会被公开。 必填项已用*标注