上一节我们说了Spark SQL这一构建于Spark Core之上的框架,这一次就让我们说下Spark SQL框架中关键的数据类型DataFrame相关的数据处理算子,主要可以分为两类,一类是结构化查询语言也就是sql,另一类则是DataFrame相关的开发算子。

那么我们先说SQL语句,对于任意的DataFrame,我们都可以使用createTempView或者createGlobalTempView 在Spark SQL中创建临时数据表

两者一个是创建的临时表,其生命周期仅限于SparkSession内部,createGlobalTempView创建的临时表,可以在同一个应用程序中跨SparkSession提供访问

我们来展示下如何使用createTempView创建临时表

基本代码如下

import org.apache.spark.sql.DataFrame

import spark.implicits._

val seq = Seq((“Alice”, 18), (“Bob”, 14))

val df = seq.toDF(“name”, “age”)

df.createTempView(“t1”)

val query: String = “select * from t1”

// spark为SparkSession实例对象

val result: DataFrame = spark.sql(query)

result.show 13

/** 结果打印

+—–+—+

| n ame| age|

+—–+—+

| Alice| 18|

| Bob| 14|

+—–+—+

*/

首先是创建数据集,然后toDF来创建DataFrame,之后利用createTempView创建一个临时表,之后利用sql查询table,最后show来展示

首先说,DataFrame之间的转换也属于延迟计算,只有出现Action算子的时候,才会及性能执行

当然,Spark SQL还封装了一些属于自己的函数,比如array_distinct,collect_list

可以浏览官网的Functions页面查找完整的函数列表。

之后是DataFrame相关的算子

主要源于了DataFrame相关的双面性,一方面是基于RDD,故RDD支持的算子,DataFrame都支持,另一方面就是属于自己算子,基本列表如下

图片

我们按照上图的顺序进行讲解

首先是和RDD一样的算子

图片

用法还是返回都一样

然后是探索类算子

主要是用于了解算子,比如实际数据,比如数据格式,执行计划,常见的如下

图片

对于数据格式,columns schema printSchema用途类似,都是负责获取数据的格式

比如我们利用printSchema,获取的结果如下

除了数据格式等信息,还可以使用describe来查看列值的分布统计,比如使用df.describe(“age”)来获取对应age列的极值,平均值,方差等信息

以及使用explain函数,来获取Spark SQL给的执行计划

清洗类算子

我们需要对数据进行清洗,比如删除某些列,删除某些行

图片

其中drop函数负责将指定的列从DataFrame中进行清除,比如我们想要将上面的数据的gender列清除,直接调用employees.DF.drop(“gender”)即可,并且可以通过声明多个入参来清除多个列

其次是distinct,进行去重,以employeesDF为例,当有多个数据记录的字段值相同的时候,进行去重,dropDuplicates也是进行去重,不过是按照某个字段进行去重,比如按照gender进行去重,最后结果只会有男和女

图片

employees.dropDuplicates(“gender”).show

图片

最后一个算子是na,是选取DataFrame的null数据,na往往配合drop或者fill使用,employees.na.drop用于删除DataFrame中的null数据,employees.na.fill(0)则是全部填充0

接下来是转换类的算子

基本如下

图片

首先是select,选取DataFrame下的部分字段

employeesDF.select(“name”,”gender”).show

其次是selectExpr,支持其中不仅可以选择字段,还可以通过函数的方式生成新的字段

employeesDF.select(“name”,”gender“,”concat(id, ‘_’, name)as id_name“0).show

这样我们通过concat函数来将两个列拼接后形成新的数据列

其次是where和withColumnRenamed这两个算子,

where 配合sql语句对DataFrame进行数据过滤,比如

employeesDF.where(“gender = ‘Male’”)

withColumnRenamed对字段进行重命名

withColumn,则是生成新的数据列,类似selectExpr,其中可以利用sql函数

employeesDF.withColumn(“crypto”,hash($“age”)).show

图片

最后一个算子,explode,则是展开数组类型的数据列,生成新的数据记录

比如我们有个数组字段

val seq = Seq( (1, “John”, 26, “Male”, Seq(“Sports”, “News”)), 2 (2, “Lily”, 28, “Female”, Seq(“Shopping”, “Reading”)),

(3, “Raymond”, 30, “Male”, Seq(“Sports”, “Reading”))

)

val employeesDF: DataFrame = seq.toDF(“id”, “name”, “age”, “gender”, “interests”)

employeesDF.withColumn(“interest”, explode($”interests”)).show

得到的结果如下

图片

将数据进行了展开

还有就是分析类的算子

图片

对于join,我们之前演示过,需要我们声明关联键和关联的形式

其次是groupBy,往往配合agg进行使用

比如

Val result = df.groupBy(“gender”).agg(sum(“salary”).as(“sum_salary”))

得到结果

最后是sort或者orderBy两者都是对数据集进行排序,用法和效果是一致的

比如

employeesDF.sort(desc(“sum_salary”,asc(“gender”))).show

到此,我们除了最后的输出算子都说完了,关于持久化算子,则跟read API类似,都由format定义的文件格式,多个写入选项,以及save路径组成

图片

关于不同的option,则可以参考官方文档

比如其中的mode有

图片

总结一下本章,我们讲述DataFrame不同阶段的处理算子

分别是同类算子

探索类算子

清洗类算子

转换类算子

分析类算子

持久化算子

发表评论

邮箱地址不会被公开。 必填项已用*标注