spark数据预处理1

Pyspark数据预处理1

1.1 重复值

1
2
3
4
5
6
7
8
9
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

1.1.1 查重

1
2
print("总数据量:{}".format(df.count()))
print("重复数据量:{}".format(df.count() - df.distinct().count()))
总数据量:7
重复数据量:1

1.1.2 去重

1
2
df1 = df.dropDuplicates()
df1.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
|  1| 144.5|   5.9| 33|     M|
|  4| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
+---+------+------+---+------+

1.1.3 查询排除ID列后的重复值

1
2
3
4
5
6
print("df1中样本量:{}".format(df1.count()))
print("df1去重后样本量:{}"\
.format(
df1.select([c for c in df1.columns if c != 'id'])
.distinct()
.count()))
df1中样本量:6
df1去重后样本量:5
1
2
df2 = df1.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
df2.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  4| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+

1.1.4 查看是否有重复ID

1
2
3
4
5
import pyspark.sql.functions as fn
df2.agg(
fn.count('id').alias('count'),
fn.countDistinct('id').alias('distinct'))\
.show()
+-----+--------+
|count|distinct|
+-----+--------+
|    5|       4|
+-----+--------+

这里需要给每行数据赋唯一ID。通过fn.monotonically_increasing_id()方法给每一条记录提供一个唯一并且递增的ID,当数据放置在大约不到10亿个分区中,每个分区的记录少于8亿条,ID就能被保证时唯一的。

1
2
df3 = df2.withColumn('new_id', fn.monotonically_increasing_id())
df3.show()
+---+------+------+---+------+-------------+
| id|weight|height|age|gender|       new_id|
+---+------+------+---+------+-------------+
|  5| 133.2|   5.7| 54|     F|  25769803776|
|  1| 144.5|   5.9| 33|     M| 171798691840|
|  2| 167.2|   5.4| 45|     M| 592705486848|
|  3| 124.1|   5.2| 23|     F|1236950581248|
|  5| 129.2|   5.3| 42|     M|1365799600128|
+---+------+------+---+------+-------------+
1
df3.printSchema()
root
 |-- id: long (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- new_id: long (nullable = false)

1.2. 空缺值

1
2
3
4
5
6
7
8
9
df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28, 'M', 100000),
(2, 167.2, 5.4, 45, 'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),
], ['id', 'weight', 'height', 'age', 'gender', 'income'])

1.2.1 查找缺失值

1
2
3
df_miss.rdd.map(
lambda x: (x['id'], sum([c is None for c in x]))
).collect()
[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]
1
df_miss.where('id == 3').show()
+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+

1.2.2 检查每一列缺失值数据的百分比:

1
2
3
4
df_miss.agg(*[
(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing')
for c in df_miss.columns
]).show()
+----------+------------------+--------------+------------------+------------------+------------------+
|id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
+----------+------------------+--------------+------------------+------------------+------------------+
|       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+----------+------------------+--------------+------------------+------------------+------------------+

1.2.3 移除缺失严重的特征

1
2
3
4
5
# 方式一:
df_miss1 = df_miss.select([
c for c in df_miss.columns if c != 'income'
])
df_miss1.show()
+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  3|  null|   5.2|null|  null|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+
1
2
3
# 方式二: 设定阀值,阀值越大过滤越严格
df_miss2 = df_miss.dropna(thresh=3)
df_miss2.show()
+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  1| 143.5|   5.6|  28|     M|100000|
|  2| 167.2|   5.4|  45|     M|  null|
|  4| 144.5|   5.9|  33|     M|  null|
|  5| 133.2|   5.7|  54|     F|  null|
|  6| 124.1|   5.2|null|     F|  null|
|  7| 129.2|   5.3|  42|     M| 76000|
+---+------+------+----+------+------+

1.2.4 填充空缺值(很好用的方法)

使用df.fillna()方法并传给它一个字典效率高

1
2
3
4
5
6
7
# 填充平均值,先创建平均值字典
means = df_miss1.agg(
*[fn.mean(c).alias(c) for c in df_miss1.columns if c != 'gender'])\
.toPandas().to_dict('records')[0]
means['gender'] = 'missing'

df_miss1.fillna(means).show()
+---+------------------+------+---+-------+
| id|            weight|height|age| gender|
+---+------------------+------+---+-------+
|  1|             143.5|   5.6| 28|      M|
|  2|             167.2|   5.4| 45|      M|
|  3|140.28333333333333|   5.2| 40|missing|
|  4|             144.5|   5.9| 33|      M|
|  5|             133.2|   5.7| 54|      F|
|  6|             124.1|   5.2| 40|      F|
|  7|             129.2|   5.3| 42|      M|
+---+------------------+------+---+-------+

1.3. 离群值

1.3.1 定义离群区间

IQR:定义为上分位和下分位之差。

1
2
3
4
5
6
7
8
9
df_outliers = spark.createDataFrame([
(1, 143.5, 5.3, 28),
(2, 154.2, 5.5, 45),
(3, 342.3, 5.1, 99),
(4, 144.5, 5.5, 33),
(5, 133.2, 5.4, 54),
(6, 124.1, 5.1, 21),
(7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])

计算每个特征的上下截断点:.approxQuantitle(...)方法

  • 第一个参数指定列名
  • 第二个参数可以时0~1之间的任意数(0.5为指定中位数)
  • 第三个参数指定对每个度量的容忍度(如果为1,就会计算一个度量的准确值)
1
2
3
4
5
6
7
8
9
10
cols = ['weight', 'height', 'age']
bounds = {}

for col in cols:
quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.01)
IQR = quantiles[1] - quantiles[0]
bounds[col] = [quantiles[0] - 0.4 * IQR, quantiles[1] + 0.4 * IQR]

bounds
# 根据实际业务情况进行调整
{'age': [17.6, 64.4],
 'height': [4.9399999999999995, 5.66],
 'weight': [119.19999999999999, 164.2]}

1.3.2 标记离群值

1
2
3
4
5
6
7
outliers = df_outliers\
.select(*['id']
+ [(
(df_outliers[c] < bounds[c][0])|
(df_outliers[c] > bounds[c][1])
).alias(c + '_o') for c in cols ])
outliers.show()
+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
|  7|   false|   false|false|
|  6|   false|   false|false|
|  5|   false|   false|false|
|  1|   false|   false|false|
|  3|    true|   false| true|
|  2|   false|   false|false|
|  4|   false|   false|false|
+---+--------+--------+-----+

1.3.3 查看离群值

1
2
3
bad_outlier = df_outliers.join(outliers, on='id')
bad_outlier.filter('weight_o').select('id', 'weight').show()
bad_outlier.filter('age_o').select('id', 'age').show()
+---+------+
| id|weight|
+---+------+
|  3| 342.3|
+---+------+

+---+---+
| id|age|
+---+---+
|  3| 99|
+---+---+