弹性式分布数据集RDD——Pyspark基础 (二)

RDD的内部运行方式

RDD不仅是一组不可变的JVM(Java虚拟机)对象的分布集,而且是Spark的核心,可以让任务执行高速运算。

RDD将跟踪(计入日记)应用于每个快的所有转换,以加速计算速度,并在发生错误和部分数据丢失时提供回退(容错机制)。

RDD采用并行的运行方式,也就是每个转换操作并行执行,从而提高速度。
RDD有两种并行操作:

  • 转换操作(返回指向新的RDD的指针)
  • 动作操作(在运行计算后向驱动程序返回值)

数据集的转换通常是惰性的,这也意味着任何转换操作仅在调用数据集上的操作时才执行。该延迟执行会产生风多的精细查询:针对性能进行优化查询。这种优化始于Spark的DAGScheduler——面向阶段的调度器。DAGScheduler负责Stage级的调度详见:Spark运行原理剖析

由于具有单独的RDD转换和动作,DAGScheduler可以在查询中执行优化。包括但不限于避免shuffle数据(最耗费资源的任务)

创建RDD

方式一: 用.parallelize(...)集合(元素list或array)

1
data = sc.parallelize([('a',1),('b',2),('c',3),('d',5),('e',5)])

方式二: 读入外部文件

  • 支持多文件系统中读取:如NTFS、FAT、HFS+(Mac OS Extended),或者如HDFS、S3、Cassandra这类的分布式文件系统,还有其他类文件系统。
  • 指出多种数据格式:如文本、parquet、JSON、Hive tables(Hive表)以及使用JDBC驱动程序可读取的关系数据库中的数据。(注意:Spark可以自动处理压缩数据集)

💡Tip1:读取的方式不同,持有对象表达方式也不同。从文件中读取的数据表示为MapPartitionsRDD;使用集合方法的数据表示为ParallelCollectionRDD

💡Tip2:RDD是无schema的数据结构(和DataFrame不同),所以我们几乎可以混用任何数据结构:tuple、dict、list和spark等都能支持。如果对数据集使用.collect()方法,将把RDD对所有元素返回给驱动程序,驱动程序将其序列化成了一个列表。

1
data_from_file = sc.textFile("hdfs://master:9000/pydata/VS14MORT.txt.gz",4) # 这里表示4个分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def extractInformation(row):
import re
import numpy as np

selected_indices = [
2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
19,21,22,23,24,25,27,28,29,30,32,33,34,
36,37,38,39,40,41,42,43,44,45,46,47,48,
49,50,51,52,53,54,55,56,58,60,61,62,63,
64,65,66,67,68,69,70,71,72,73,74,75,76,
77,78,79,81,82,83,84,85,87,89
]
'''
Input record schema
schema: n-m (o) -- xxx
n - position from
m - position to
o - number of characters
xxx - description
1. 1-19 (19) -- reserved positions
2. 20 (1) -- resident status
3. 21-60 (40) -- reserved positions
4. 61-62 (2) -- education code (1989 revision)
5. 63 (1) -- education code (2003 revision)
6. 64 (1) -- education reporting flag
7. 65-66 (2) -- month of death
8. 67-68 (2) -- reserved positions
9. 69 (1) -- sex
10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
11. 71-73 (3) -- number of units (years, months etc)
12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
13. 75-76 (2) -- age recoded into 52 categories
14. 77-78 (2) -- age recoded into 27 categories
15. 79-80 (2) -- age recoded into 12 categories
16. 81-82 (2) -- infant age recoded into 22 categories
17. 83 (1) -- place of death
18. 84 (1) -- marital status
19. 85 (1) -- day of the week of death
20. 86-101 (16) -- reserved positions
21. 102-105 (4) -- current year
22. 106 (1) -- injury at work
23. 107 (1) -- manner of death
24. 108 (1) -- manner of disposition
25. 109 (1) -- autopsy
26. 110-143 (34) -- reserved positions
27. 144 (1) -- activity code
28. 145 (1) -- place of injury
29. 146-149 (4) -- ICD code
30. 150-152 (3) -- 358 cause recode
31. 153 (1) -- reserved position
32. 154-156 (3) -- 113 cause recode
33. 157-159 (3) -- 130 infant cause recode
34. 160-161 (2) -- 39 cause recode
35. 162 (1) -- reserved position
36. 163-164 (2) -- number of entity-axis conditions
37-56. 165-304 (140) -- list of up to 20 conditions
57. 305-340 (36) -- reserved positions
58. 341-342 (2) -- number of record axis conditions
59. 343 (1) -- reserved position
60-79. 344-443 (100) -- record axis conditions
80. 444 (1) -- reserve position
81. 445-446 (2) -- race
82. 447 (1) -- bridged race flag
83. 448 (1) -- race imputation flag
84. 449 (1) -- race recode (3 categories)
85. 450 (1) -- race recode (5 categories)
86. 461-483 (33) -- reserved positions
87. 484-486 (3) -- Hispanic origin
88. 487 (1) -- reserved
89. 488 (1) -- Hispanic origin/race recode
'''
record_split = re\
.compile(
r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' +
r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' +
r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' +
r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' +
r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
try:
rs = np.array(record_split.split(row))[selected_indices]
except:
rs = np.array(['-99'] * len(selected_indices))
return rs

data_file = data_from_file.map(extractInformation)
data_file.map(lambda row: row).take(1)
data_file.cache()
data_file.is_cached
True

全局作用域和局部作用域

Spark可以在两种模式下运行:本地和集群。本地运行Spark代码时和目前使用的python没有说明不同。然而他如果将相同的代码部署到集群,便可能会导致大量的困扰,这就需要了解Spark是怎么在集群上执行工作的。这里有一篇文章介绍的很详细。参考:Spark运行原理详解

在集群模式下,提交任务时任务发送给了Master节点。该驱动程序节点为任务创建DAG,并且决定哪一个执行者(Worker)节点运行特定的任务。然后该驱动程序知识工作者执行它们的任务,并且在结束时将结果返回给驱动程序。然而在这之前,驱动程序为每一个任务的终止做准备:驱动程序中有一组变量和方法,以变工作者在RDD上执行任务。

这组变量和方法在执行者的上下问本质上是静态的,每个执行器从驱动程序中获取的一份变量和方法的副本。这意味着运行任务时,如果执行者改变这些变量或覆盖这些方法,它不影响任何其他执行者的副本或者驱动程序的变量和方法。这可能会导致一些意想不到的行为和运行错误,这些行为和错误通常都很难被追踪到。

转换

转换操作可以调整数据集。包括映射、筛选、链接、转换数据集中的值。

.map()转换

1
2
data_2014 = data_file.map(lambda x: x[16])
data_2014.take(10)
['2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '-99']

.filter()转换

1
2
3
data_filter = data_file.filter(lambda x: x[16] == '2014' and x[21] == '0')
print(data_filter.count())
data_file.take(2)
22





[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40'),
 array(['1', '  ', '2', '1', '01', 'M', '1', '058', ' ', '37', '17', '08',
        '  ', '4', 'D', '3', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I250',
        '214', '062', '   ', '21', '03', '11I250 ', '61I272 ', '62E669 ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '03',
        'I250 ', 'E669 ', 'I272 ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

.flatMap()转换

.flatMap()方法和.map()工作类似,不同的是flatMap()返回一个扁平的结果而不是一个列表。

1
2
data_flat = data_file.flatMap(lambda x: (x[16], int(x[16])+1))
data_flat.take(10)
['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

.flatMap()可以用于过滤一些格式不正确的记录。在这个机制下,.flatMap()方法吧每一行看作一个列表对待,然后将所有记录简单的加入到一起,通过传递一个空列表可以丢弃格式不正确的记录。

.distinct()转换

这里用该方法检查性别列表是否只包含了男性和女性验证我们是否准确解释了数据集。

1
2
distinct_gender = data_file.map(lambda x: x[5]).distinct()
distinct_gender.collect()
['M', 'F', '-99']

.sample() 转换

该方法返回数据集的随机样本。第一个参数withReplacement指定采样是否应该替换,第二个参数fraction定义返回数据量的百分比,第三个参数是伪随机数产生器的种子seed

为了节省运算时间,这里选取愿数据千分之一的随机数据作为下面的练习数据。

1
2
data_sample = data_file.sample(False, 0.001, 666)
data_sample.cache()
PythonRDD[25] at RDD at PythonRDD.scala:48

.leftOuterJoin()转换

  • .leftOuterJoin(): 根据两个数据集中都有得值来连接两个RDD,并返回左侧的RDD记录,而右边的记录副加载两个RDD匹配的地方。
  • .join() :只返回两个RDD之间的关联数值
  • .intersection():返回两个RDD中相等的记录
1
2
3
4
5
rdd1 = sc.parallelize([('a',1), ('b',4), ('c',10)])
rdd2 = sc.parallelize([('a',4), ('a',1), ('b',6), ('d',15)])
print("leftOuterJoin: ",rdd1.leftOuterJoin(rdd2).collect())
print("Join: ",rdd1.join(rdd2).collect())
print("intersection: ", rdd1.intersection(rdd2).collect())
leftOuterJoin:  [('c', (10, None)), ('b', (4, 6)), ('a', (1, 1)), ('a', (1, 4))]
Join:  [('b', (4, 6)), ('a', (1, 1)), ('a', (1, 4))]
intersection:  [('a', 1)]

.repartition()转换

重新对数据集进行分区,改变数据集分赛区的数量。此功能应该谨慎并且仅当真正需要的时候使用,因为它会充足数据,导致性能产生巨大的影响。

1
2
3
print(len(rdd2.glom().collect()))
rdd2 = rdd2.repartition(4)
print(len(rdd2.glom().collect()))
3
4

动作

.collect() 动作

返回所有RDD的元素给驱动程序

💡同时常用的还有: .collectAsMap()方法

.take() 动作

可以说这事最有用的方法,返回单个数据分区的前n行。

1
2
3
rdd.take(1)
#等同于:
rdd.first()

.reduce() 动作

该方法使用指定的方法减少RDD中的元素。可以用该方法计算RDD总的元素之和:

1
rdd1.map(lambda x: x[1]).reduce(lambda x, y: x + y)

在每一个分区里,reduce()方法运行求和方法,将改总和返回给最终聚合所在的程序节点。

⚠️警告:
要谨慎注意的是,reduce传递的函数需要时关联的,既满足元素顺序改变结果不变,操作符顺序改变结果不变。如:

1
2
3
4
rdd = sc.parallelize([1, 2, 0.5, 0.1],1)
rdd.reduce(lambda x, y: x / y)

out: 10.0
1
2
3
4
rdd = sc.parallelize([1, 2, 0.5, 0.1],2)
rdd.reduce(lambda x, y: x / y)

out: 0.1

这里我们希望输出结果是10.0,第一个只把RDD放在一个分区,输出结果符合预期。但是在第二个例子中,分了2个区,结果就不对了。因为该方法是在每个分区并行计算的。

.reduceByKey() 动作

该方法和.reduce()方法类似,但是实在key-key基础上运行:

1
2
data_key = sc.parallelize([('a',3), ('a',1), ('b',6), ('d',1), ('b',6), ('d',15), ('d',3), ('a',7), ('b', 8)],4)
data_key.reduceByKey(lambda x, y: x+y).collect()
[('b', 20), ('a', 11), ('d', 19)]

.count() 动作

.count() 方法统计出了RDD里所有的元素数量。

1
rdd.count()

.count() 方法产生入戏方法同样的结果,但不需要把整个数据集移动到驱动程序:

1
len(rdd.collect()). # ⚠️警告:不要这样做!!

.countByKey() 动作

如果数据集是Ket-Value形式,可以使用.countByKey()方法

1
data_key.countByKey().items()
dict_items([('a', 3), ('b', 3), ('d', 3)])

.saveAsTextFile() 动作

该方法将RDD保存为文本文件:每个文件一个分区

1
data_key.saveAsTextFile('hdfs://master:9000/out/data_key.txt')

要读取它的时候需要解析,因为所有行都被视为字符串:

1
2
3
4
5
6
7
def parseInput(row):
import re
pattern = re.compile(r"\(\'([a-z]+)\',.([0-9]+)\)") # 这里“+”号代表匹配一个或多个匹配字符,否则针对双位数动作操作会报错
row_split = pattern.split(row)
return (row_split[1], row_split[2])
data_key_read = sc.textFile('hdfs://master:9000/out/data_key.txt')
data_key_read.map(parseInput).collect()
[('a', '3'),
 ('a', '1'),
 ('b', '6'),
 ('d', '1'),
 ('b', '6'),
 ('d', '15'),
 ('d', '3'),
 ('a', '7'),
 ('b', '8')]

💡同时还有:

  • rdd.saveAsHadoopDataset
  • rdd.saveAsSequenceFile

  • 等方法

.foreach() 动作

这个方法对RDD里的每个元素,用迭代方法应用相同的函数;和.map()相比,.foreach()方法按照一个接一个的方式,对每一条记录应用一个定义好的函数。当希望将数据曹村道PySpark本身不支持的数据库是,该方法很有用。

1
2
3
4
def f(x):
print(x)

rdd.foreach(f)

小结:

  • RDD是Spark的核心;这些无schema数据结构早Spark中处理的最基本的数据结构。
  • RDD的两种创建方式: parallelize 和 文件读取
  • Spark中的转化是惰性的,只在操作被调用时应用。
  • Scala 和 Python RDD之间一个主要的区别是速度: Python RDD 比 Scala 慢很多!