SparkSQL DataFrame进阶篇

1.创建SparkSession【2.0】和 SQLContext实例【1.x】

1
2
3
4
5
6
7
8
9
10
11
1.创建SparkSession2.0
///spark2.0后,用sparksession代替sparkcontext和sqlcontext的创建
val spark= SparkSession.builder().appName("SparkSQLTest").getOrCreate()
val numbers=spark.range(1 ,10, 2)
numbers.show()

2.创建SQLContext实例【1.x】
val conf= new SparkConf().setAppName("SQL_Advanced_case").setMaster("local")
val sc=new SparkContext(conf)
val sqlContext=new SQLContext(sc)
import.sqlContext.implicits._

2.创建DataFrame

方式1:不创建RDD

使用createDataFram方法,直接基于列表List创建DataFrame.缺点是创建出来的DataFrame没有列名

1
2
3
val customerData=List(("Alex","浙江",39,230.00), ("Bob","北京", 18, 170.00), ("Chris", "江苏", 45, 529.95), ("Dave", "北京", 25, 99.99), ("Ellie", "浙江", 23, 1299.95), ("Fred", "北京", 21, 1099.00))
val customerDF1=sqlContext.createDataFrame(customerData)
customerDF1.printSchema

方式2:不创建RDD

1
2
3
4
5
6
使用createDataFram方法,直接基于列表List创建DataFrame.即便定义了样例类,但基于createDataFrame创建出来的DataFrame缺点是创建出来的DataFrame没有列名
val customerData=List(("Alex","浙江",39,230.00), ("Bob","北京", 18, 170.00), ("Chris", "江苏", 45, 529.95), ("Dave", "北京", 25, 99.99), ("Ellie", "浙江", 23, 1299.95), ("Fred", "北京", 21, 1099.00))
case class CustomerInfo(customer:String, province:String, age: Int, total:Double )
val cdf=cDataRDD.map(x=>x.split(",")).map(x=>CustomerInfo(x(0),x(1),x(2).toInt,x(3).toDouble)).toDF
val customerDF1=sqlContext.createDataFrame(customerData)
customerDF1.printSchema

方式3:传统方式

1
2
3
4
5
6
先创建RDD和样例类,然后通过toDF创建DataFrame。此时DataFrame中包含列信息。
val customerData=Array("Alex,浙江,39,230.00","Bob,北京,18,170.00","Chris,江苏,45,529.95","Dave,北京,25,99.99","Ellie,浙江,23,1299.95","Fred,北京,21,1099.00")
case class CustomerInfo(customer:String, province:String, age: Int, total:Double )
val customerRDD=sc.makeRDD(customerData)
val customerDF1=customerRDD.map(x=>x.split(",")).map(x=>CustomerInfo(x(0),x(1),x(2).toInt,x(3).toDouble)).toDF
customerDF1.printSchema

其他

修改列名

1
val customerDF=customerDF1.withColumnRenamed("_1","customer").withColumnRenamed("_2","province").withColumnRenamed("_3", "age").withColumnRenamed("_4", "total")

查看表模式

1
customerDF.printSchema

数值型数据基本的统计与分析

1
customerDF.describe().show

3.DataFrame方法

select

1
2
3
4
5
6
7
8
customerDF.select(customerDF.col("customer")).show
customerDF.select(customerDF("customer")).show
customerDF.select("customer", "province").show
customerDF.select($"customer", $"province").show
customerDF.select(col("customer"), col("province")).show
customerDF.select(customerDF("customer"), col("province")).show
customerDF.select("customer", $"province").show //错误,字符串与$不能混用。
customerDF.select(col("customer"), $"province").show //是否正确?正确!!

使用表达式

(Column对象中的方法)

1
customerDF. select($"customer",( $"age"*2)+10, $"province"==="浙江").show //计算$"province"==="浙江"这一关系表达式的值

as、alias列的重命名

1
customerDF. select($"customer" as "name",( $"age"*2)+10 alias "newAge", $"province"==="浙江" as "isZJ").show

lit添加列

1
2
3
4
org.apache.spark.sql.functions中的方法lit
val cdf1=customerDF.select($"customer", $"age", when($"age"<20,1).when($"age"<30, 2).otherwise(3) as "ageGroup", lit(false) as "trusted")
cdf1.show
cdf1.printSchema

drop

1
2
3
val cdf2=cdf1.drop("trusted")
cdf2.show
cdf2.printSchema

distinct

1
customerDF.select($"province").distinct.show

filter

1
2
3
4
customerDF.filter($"age">30).show
customerDF.filter("age>30").show
customerDF.filter($"age"<=30 and $"province"==="浙江" )
customerDF.filter("age<=30 and province =’浙江’")

聚合操作

withColumn

向已有的DataFrame添加一个新列,不删除之前的列

1
2
val customerAgeGroupDF=customerDF.withColumn("agegroup", when($"age"<20, 1).when($"age"<30, 2).otherwise(3))
customerAgeGroupDF.show

groupBy

操作返回GroupedData对象【2.0中为RelationalGroupedDataSet】

1
2
3
4
//其中封装了大量聚合方法。
customerAgeGroupDF.groupBy("agegroup").max().show()
customerAgeGroupDF.groupBy("agegroup","province").count().show()
customerAgeGroupDF.groupBy("agegroup").min("age", "total").show()

agg

1
customerAgeGroupDF.groupBy("agegroup").agg(sum($"total"), min($"total")).show()

pivot

1
2
3
customerAgeGroupDF.groupBy("province").pivot("agegroup").sum("total").show()
customerAgeGroupDF.groupBy("province").pivot("agegroup",Seq(1,2)). agg("total").show()
customerAgeGroupDF.groupBy("province").pivot("agegroup",Seq(2,3)).agg(sum($"total"), min($"total")).filter($"provice"=!="北京").show

sort

1
2
3
4
5
6
7
8
customerDF.orderBy("age").show
customerDF.orderBy($"age").show
customerDF.orderBy(desc("age")).show()
/*此处orderBy方法一定要用在所有聚合函数之后,因为groupBy方法返回的是GroupedData类型数据,
该类型数据中的聚合方法返回DateFrame类型对象,而orderBy是DataFrame中的方法,所以用在groupBy
之后会提示错误:orderBy不是GroupedData的成员方法。*/
customerAgeGroupDF.groupBy("agegroup","province").count().orderBy($"agegroup".desc).show()
students.sort($"age".asc).show(5)