spark数据预处理2

数据预处理2

2.1 RDD读取方式

1、读取文件为RDD

1
2
3
4
5
6
fileData = sc.textFile('/pydata/ccFraud.gz')
header = fileData.first()

rddData = fileData \
.filter(lambda row: row != header) \
.map(lambda row: [int(elem) for elem in row.split(',')])

2、创建schema

1
2
3
4
5
6
import pyspark.sql.types as typ

fields = [typ.StructField(h[:], typ.IntegerType(), True)
for h in header.split(',')]

schema = typ.StructType(fields)

3、RDD创建DataFrame

1
2
dfData = spark.createDataFrame(fraud, schema)
dfData.printSchema()
root
 |-- "custID": integer (nullable = true)
 |-- "gender": integer (nullable = true)
 |-- "state": integer (nullable = true)
 |-- "cardholder": integer (nullable = true)
 |-- "balance": integer (nullable = true)
 |-- "numTrans": integer (nullable = true)
 |-- "numIntlTrans": integer (nullable = true)
 |-- "creditLine": integer (nullable = true)
 |-- "fraudRisk": integer (nullable = true)

4、创建视图

这里视图sql查询失败,而在spark-shell中没问题,是什么原因?

1
dfData.createOrReplaceTempView("DataViw")
1
spark.sql("select gender,count(*) from DataViw group by gender").show()
+-------+
|count()|
+-------+
|      0|
+-------+

2.2 DataFrame读取模式

创建类型方式一:全部为IntegerType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pyspark.sql.types as typ

fields = [
'custID',
'gender',
'state',
'cardholder',
'balance',
'numTrans',
'numIntlTrans',
'creditLine',
'fraudRisk',
]

schema = typ.StructType([typ.StructField(f, typ.IntegerType(), True) for f in fields])
fraud_df = spark.read.csv("/pydata/ccFraud.gz", header='true', schema = schema, sep=',')
1
fraud_df.printSchema()
root
 |-- custID: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- state: integer (nullable = true)
 |-- cardholder: integer (nullable = true)
 |-- balance: integer (nullable = true)
 |-- numTrans: integer (nullable = true)
 |-- numIntlTrans: integer (nullable = true)
 |-- creditLine: integer (nullable = true)
 |-- fraudRisk: integer (nullable = true)

创建类型方式二: 指定每一列的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pyspark.sql.types as typ

fields = [
('custID', typ.StringType()),
('gender', typ.IntegerType()),
('state', typ.IntegerType()),
('cardholder', typ.IntegerType()),
('balance', typ.IntegerType()),
('numTrans', typ.IntegerType()),
('numIntlTrans', typ.IntegerType()),
('creditLine', typ.IntegerType()),
('fraudRisk', typ.IntegerType()),
]

schema = typ.StructType([
typ.StructField(e[0], e[1], True) for e in fields
])

fraud_df = spark.read.csv("/pydata/ccFraud.gz", header='true', schema = schema, sep=',')
1
fraud_df.printSchema()
root
 |-- custID: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- state: integer (nullable = true)
 |-- cardholder: integer (nullable = true)
 |-- balance: integer (nullable = true)
 |-- numTrans: integer (nullable = true)
 |-- numIntlTrans: integer (nullable = true)
 |-- creditLine: integer (nullable = true)
 |-- fraudRisk: integer (nullable = true)
1
fraud_df.show(10)
+------+------+-----+----------+-------+--------+------------+----------+---------+
|custID|gender|state|cardholder|balance|numTrans|numIntlTrans|creditLine|fraudRisk|
+------+------+-----+----------+-------+--------+------------+----------+---------+
|     1|     1|   35|         1|   3000|       4|          14|         2|        0|
|     2|     2|    2|         1|      0|       9|           0|        18|        0|
|     3|     2|    2|         1|      0|      27|           9|        16|        0|
|     4|     1|   15|         1|      0|      12|           0|         5|        0|
|     5|     1|   46|         1|      0|      11|          16|         7|        0|
|     6|     2|   44|         2|   5546|      21|           0|        13|        0|
|     7|     1|    3|         1|   2000|      41|           0|         1|        0|
|     8|     1|   10|         1|   6016|      20|           3|         6|        0|
|     9|     2|   32|         1|   2428|       4|          10|        22|        0|
|    10|     1|   23|         1|      0|      18|          56|         5|        0|
+------+------+-----+----------+-------+--------+------------+----------+---------+
only showing top 10 rows