SparkSQL DataFrame基础篇

#SparkSQL DataFrame基础篇

1
2
3
4
5
6
7
Spark 2.2及以后的SparkSession替换了Spark以前版本中的SparkContextSQLContext,为Spark集群提供了唯一的入口点。
val spark =SparkSession.builder().
appName(“SparkExample”).
getOrCreate()
为了向后兼容,SparkSession对象包含SparkContextSQLContext对象。当使用交互式Spark shell时,创建一个SparkSession类型对象名为spark。

因此该文档里所有的SQLContext在spark2.2+中都可以替换成spark

基于反射机制创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
//students.txt
160201,Michael,17
160101,Andy,23
160301,justin,23
160202,John,22
160102,Herry,17
160203,Brewster,18
160302,Brice,20
160303,Justin,25
160103,Jerry,22
160304,Tom,24

不结合hive,使用spark实例(适用于spark1+)
val spark=new org.apache.spark.sql.spark(sc)
import spark.implicits._

结合hive后,访问hive时使用HiveContext实例
val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._

这里与Spark2.2+有所不同

1
2
3
4
5
6
case class Student(id: String, name : String, age: Int)
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p => Student(p(0), p(1), p(2).trim.toInt))
val students = rdd.toDF()
students.registerTempTable("tb_students")
val youngstudents=spark.sql("SELECT name FROM tb_students WHERE age>=19 AND age<=22")
youngstudents.show

基于编程创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.spark.sql.types._   
import org.apache.spark.sql._
import org.apache.spark._

val students=sc.textFile("hdfs://master:9000/sqldata/students.txt")

val schemaString="id name age"

val schema=StructType(schemaString.split(" ").map(fieldname=> StructField(fieldname, StringType, true)))

val rowRDD=students.map(_.split(",")).map(p=>Row(p(0), p(1), p(2).trim))

val studentsDataFrame=spark.createDataFrame(rowRDD, schema)

studentsDataFrame.registerTempTable("tb_students")

val names=spark.sql("SELECT name FROM tb_students")

names.show

基于DataFrame创建Json文件

1.创建DataFrame

1
2
3
4
import spark.implicits._
case class Student(id: String, name : String, age: Int)
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p => Student(p(0), p(1), p(2).trim.toInt))
val students = rdd.toDF()

2.另存为json类型文件

1
2
3
4
5
6
7
8
//students.save("hdfs://master:9000/sqldata/students.json")//无法执行

spark1.4及以后,dataframe中的save方法不建议使用,有的直接被弃用,
使用DataFrame中的write方法返回一个DataFrameWriter类型对象,再使用里面的save方法、format().save()、parquet()等方法

students.write.save("hdfs://master:9000/sqldata/students.json")
students.write.format("json").save("hdfs://master:9000/sqldata/s1.json")//Spark 1.4 DataFrameWriter中方法format、save
students.write.json("hdfs://master:9000/sqldata/s2.json")//Spark 1.4 DataFrameWriter中方法json

基于DataFrame创建Parquet文件

1.创建DataFrame

1
2
3
4
import spark.implicits._
case class Student(id: String, name : String, age: Int)
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p => Student(p(0), p(1), p(2).trim.toInt))
val students = rdd.toDF()

2.另存为parquet类型文件

1
2
3
4
5
6
7
8
//students.save("hdfs://master:9000/sqldata/students.parquet")//无法执行

spark1.4及以后,dataframe中的save方法不建议使用,有的直接被弃用,
使用DataFrame中的write方法返回一个DataFrameWriter类型对象,再使用里面的save方法、format().save()、parquet()等方法

students.write.save("hdfs://master:9000/sqldata/students.parquet")
students.write.format("parquet").save("hdfs://master:9000/sqldata/s1.parquet")
students.write.parquet("hdfs://master:9000/sqldata/s2.parquet")

基于Json创建DataFrame

1
2
3
4
5
6
7
8
9
//read方法是spark中的方法,返回值类型是DataFrameReader,以往的jsonFile方法已经不建议使用 
val students=spark.read.json("hdfs://master:9000/sqldata/s2.json")//read.json()是最通用的一种方法s1.json,s2.json都可读
students.registerTempTable("tb_students")
spark.sql("select * from tb_students").show
/*
val students=spark.read.load("hdfs://master:9000/sqldata/students.json")//可执行,但换做读s1.json,s2.json不可读
val students=spark.read.load("hdfs://master:9000/sqldata/s1.json") //不可执行,load默认读取parquet类型文件
val students=spark.jsonFile("hdfs://master:9000/sqldata/students.json") //jsonFile不可用
*/

基于Parquet创建DataFrame

1
2
3
4
5
6
7
8
9
//read方法是spark中的方法,返回值类型是DataFrameReader,以往的parquetFile方法已经不建议使用 
val students=spark.read.parquet("hdfs://master:9000/sqldata/students.parquet") //最通用的一种方法,students.parquet,s1.parquet,s2.parquet都可读
students.registerTempTable("tb_students")
spark.sql("select * from tb_students").show
/*
val students=spark.read.load("hdfs://master:9000/sqldata/students.parquet")
val students=spark.load.parquet("hdfs://master:9000/sqldata/s1.parquet")//此处不可执行因为load非SparkSession中方法,Spark1.X中load方法可执行,默认读取parquet文件
val students=spark.parquetFile("hdfs://master:9000/sqldata/students.parquet")//parquetFile不可用
*/

DataFrame的其他操作

1
2
3
4
5
6
7
8
9
10
11
12
students.head    

students.head(3)

students.show(3)

students.columns

students.dtypes

students.printSchema
//更详细的Schema信息

withColumn

1
2
3
students.withColumn("bonus", students("age")*50).show
students.withColumn("bonus", $"age"*50).show//可以运行
students.withColumn("bonus", "age"*50).show//不可运行

withColumnRenamed

1
2
3
val newstudents=students.withColumnRenamed("age", "newage")

newstudents.printSchema

select

1
2
3
4
5
6
7
8
case class Student(id: String, name : String, age: Int)
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p => Student(p(0), p(1), p(2).trim.toInt))
val students = rdd.toDF()

students.select("age", "name").show
students.select( $"age", $"name").show
students.selectExpr("age+1", "name", "abs(age)").show
students.selectExpr("age+1 as newage1", "name as newName", "sqrt(age) as newage2").show

filter

1
2
3
4
5
students.filter("age>20").show
students.filter($"age">20).show
students.filter($"age">23 && $"name"==="Justin").show //可以执行
students.filter('age>20).show //可以执行
students.filter("age>23 && name===Justin").show //不可以执行,字符串中不能用逻辑运算符

where

1
2
3
4
5
students.where('age>20).show   
students.where($"age">20).show
students.where($"age">23 && $"name"==="Justin").show
students.where('age>23 && 'name==="Justin").show
students.where("age>23 && name===Justin").show //不可以执行,字符串中不能用逻辑运算符

orderBy

1
2
3
students.orderBy("age", "id").show(5)
students.orderBy(students("age")).show(5)
students.orderBy($"age").show(5)

groupBy

1
2
3
4
5
6
7
newstudents.groupBy('course_id).mean("score").orderBy('course_id).show

max("col")
min("col")
mean("col")
sum("col")
该四种方法只适用于数值型的GroupedData对象

sort

1
2
3
students.sort("age").show(5)
students.sort($"age".desc).show(5)
students.sort("age".desc).show(5)//不可执行

toDF

1
2
val newstudents=students.toDF("newid", "newage", "newname")
newstudents.printSchema

join

1
2
3
4
case class Score(id:String,course_id:String,score:Int)
val scores_rdd=sc.textFile("hdfs://master:9000/sqldata/scores.txt").map(_.split(",")).map(p => Score(p(0), p(1), p(2).trim.toInt))
val scores = scores_rdd.toDF()
students.join(scores, students("id" )===scores("id"), "outer").show

案例练习

Spark SQL基本案例1

scores.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scores 学号、课程编号、成绩
160201,1,60
160101,2,90
160301,1,70
160202,3,70
160102,3,50
160102,4,95
160302,1,98
160303,2,57
160103,3,64
160304,3,50
160201,2,77
160101,3,57
160301,3,72
160202,2,80
160102,2,58
160102,3,97
160302,4,91
160303,1,67
160103,2,62
160304,4,71

students.txt

1
2
3
4
5
6
7
8
9
10
160201,Michael,17
160101,Andy,23
160301,justin,23
160202,John,22
160102,Herry,17
160203,Brewster,18
160302,Brice,20
160303,Justin,25
160103,Jerry,22
160304,Tom,24

1.利用反射机制创建students.txt对应的DataFrame,其中包含id、name、age三个字段。

1
2
3
val rdd=sc.textFile("hdfs://master:9000/sqldata/students.txt").map(_.split(",")).map(p=>Student(p(0),p(1),p(2).toInt))
students = rdd.toDF
students.show

2.查看所有学生的姓名。

1
students.select('name).show

3.查询所有学生的年龄,并按照年龄降序排序。

1
2
students.select('age).show
students.sort("age".desc).show()

4.查询年龄小于19或年龄大于21的所有学生。

1
students.where($"age">21 || $"age"<19).show

5.添加scholarship字段,每个学生的scholarship项是其龄*20。

1
2
val newstudents = students.withColumn("scholarship",$"age"*20)
newstudents.show(5)

6.将添加scholarship字段后的DataFrame分别以Parquet和JSON格式保存至HDFS上。

1
2
newstudents.write.format("json").save("hdfs://master:9000/sqldata/newstudents.json")
newstudents.write.format("parquet").save("hdfs://master:9000/sqldata/newstudents.parquet")

7.利用SQL语句实现案例1中2-4。

1
2
3
students.registerTempTable("tb_students")

spark.sql("SELECT name FROM tb_students WHERE age>=19 AND age<=21")

8.利用自定义接口创建scores.txt对应的DataFrame,其中包含id、course_id、score三个字段。

1
2
3
4
5
val scores_rdd=sc.textFile("hdfs://master:9000/sqldata/scores.txt").map(_.split(",")).map(p => Score(p(0), p(1), p(2).trim.toInt))

val scores = scores.toDF()
val newstudents = students.join(scores, students("id" )==scores("id"), "outer")
newstudents.show

9.按照课程编号分组,查看每门课的平均成绩,并按课程编号升序排序。

1
newstudents.groupBy('course_id).mean("score").orderBy('course_id).show

10.按照学生编号分组,查看个学生的姓名和其所有课程的平均成绩,并在统计结果中筛选出平均成绩大于72的同学。

1
2
val newstudents = students.join(scores, students("id" )==scores("id"), "outer")
newstudents.groupBy($"name").mean("score").where($"avg(score)">72).orderBy("name").show

Spark SQL基本案例2

students.txt

1
2
3
4
5
6
7
8
9
10
11
学号/姓名/性别/年龄/学年/系别/
160201,Michael,male,37,2012,2,2
160101,Rose,female,33,2011,1,1
160301,justin,male,23,2013,3,3
160202,John,male,22,2012,2,2
160102,Lucy,female,27,2011,2,1
160203,Brewster,male,37,2012,1,2
160302,Susan,female,30,2013,1,3
160303,Justin,male,23,2013,3,3
160103,John,male,22,2011,2,1
160304,Lucy,female,27,2013,3,3

departments.txt

1
2
3
Computer,1
Math,2
Art,3

projects.txt

1
2
3
4
5
6
7
8
9
学号/创新项目编号/学年/
160201,XC2014001,2014,16,64,2
160101,XC201213,2012,32,48,3
160301,RW201103,2011,32,48,3
160202,XC2014002,2014,16,64,1
160102,XC2013002,2013,16,64,2
160102,XC2012011,2012,16,48,2
160302,RW201401,2014,32,32,2
160304,RW201503,2015,16,32,1

scores.txt

1
2
3
4
5
6
7
8
9
160201,90
160101,83
160301,80
160202,70
160102,67
160102,89
160302,91
160303,58
160103,64

scholarship.txt

1
2
3
4
5
6
7
8
160201,2013,2000
160201,2014,3000
160102,2013,2000
160101,2013,2000
160301,2014,2000
160302,2014,2000
160302,2015,3000
160302,2016,4000

1.查询创建的五个表的概要信息。

1
2


2.查询各院系学生总数。

1
2


3.查询各院系学生奖学金的总和并排序。

1
2


4.查询各院系学生的平均学分绩值并排序。

1
2


5.统计各学院每年学生参与创新项目所获得的创新学分数总数。

1
2