弹性分布式数据集

弹性分布式数据集(RDD:Resilient Distributed Dataset)是Apache Spark的核心,它指的是一组不可变Java虚拟机(JVM)对象的分布集,可以执行快速运算。该数据集有两个特点:一个特点是分布式,意味着数据集将被划分成块,并被分发到执行节点上,从而可以对这个数据集执行分布式快速计算;第二个特点是该数据集跟踪/记录每个数据块的所有转换,不仅可以加快计算速度,而且可以在发生错误和部分数据丢失的情况下提供回退操作,这样该数据集就可以重新计算数据。

在终端启动pyspark之后会自动创建SparkContext对象(别称为sc)和SQLContext(别称为sqlContext)。

创建RDD

在pyspark中,有两种方式创建RDD,一种是用并行化一个列表

1
2
3
4
data = sc.parallelize(
[('Amber', 22), ('Alfred', 23), ('Skye',4),('Albert',12),
('Amber',2)])
data
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489

另一种是读取一个文件或者数据库中

1
2
data_from_file = sc.textFile('/media/seisinv/Data/course/spark/notebook/VS14MORT.txt.gz',4)
data_from_file
/media/seisinv/Data/course/spark/notebook/VS14MORT.txt.gz MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

其中最后一个参数表示该数据集被划分的分区个数。

  • 需要注意的是,两种创建RDD的方式返回的对象类型不同
  • Spark不仅可以支持不同文件系统,包括本地文件系统,如NTFS,FAT等,和分布式文件系统,如HDFS,S3,Cassandra。

Schema

RDD是一种无Schema的数据结构,所以可以混合使用任何类型的数据结构,例如元组、字典、列表等。

对数据集使用\(collect()\)方法,相当于把该数据集送回驱动程序,可以访问对象中的数据,和在Python中一样。

1
2
3
4
data_heterogenous = sc.parallelize(
[('Ferrari', 'fast'), {'Porsche': 100000},
['Spain','visited', 4504]]).collect()
data_heterogenous[1]['Porsche']
100000

读取文件

从文本文件中读取数据,文件中的每一行形成RDD的一个元素。

1
data_from_file.take(1)
['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']

自定义函数

既可以使用\(lambda\)表达式,也可以使用函数,从RDD元素中提取信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def extractInformation(row):
import re
import numpy as np

selected_indices = [
2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
19,21,22,23,24,25,27,28,29,30,32,33,34,
36,37,38,39,40,41,42,43,44,45,46,47,48,
49,50,51,52,53,54,55,56,58,60,61,62,63,
64,65,66,67,68,69,70,71,72,73,74,75,76,
77,78,79,81,82,83,84,85,87,89
]

'''
Input record schema
schema: n-m (o) -- xxx
n - position from
m - position to
o - number of characters
xxx - description
1. 1-19 (19) -- reserved positions
2. 20 (1) -- resident status
3. 21-60 (40) -- reserved positions
4. 61-62 (2) -- education code (1989 revision)
5. 63 (1) -- education code (2003 revision)
6. 64 (1) -- education reporting flag
7. 65-66 (2) -- month of death
8. 67-68 (2) -- reserved positions
9. 69 (1) -- sex
10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
11. 71-73 (3) -- number of units (years, months etc)
12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
13. 75-76 (2) -- age recoded into 52 categories
14. 77-78 (2) -- age recoded into 27 categories
15. 79-80 (2) -- age recoded into 12 categories
16. 81-82 (2) -- infant age recoded into 22 categories
17. 83 (1) -- place of death
18. 84 (1) -- marital status
19. 85 (1) -- day of the week of death
20. 86-101 (16) -- reserved positions
21. 102-105 (4) -- current year
22. 106 (1) -- injury at work
23. 107 (1) -- manner of death
24. 108 (1) -- manner of disposition
25. 109 (1) -- autopsy
26. 110-143 (34) -- reserved positions
27. 144 (1) -- activity code
28. 145 (1) -- place of injury
29. 146-149 (4) -- ICD code
30. 150-152 (3) -- 358 cause recode
31. 153 (1) -- reserved position
32. 154-156 (3) -- 113 cause recode
33. 157-159 (3) -- 130 infant cause recode
34. 160-161 (2) -- 39 cause recode
35. 162 (1) -- reserved position
36. 163-164 (2) -- number of entity-axis conditions
37-56. 165-304 (140) -- list of up to 20 conditions
57. 305-340 (36) -- reserved positions
58. 341-342 (2) -- number of record axis conditions
59. 343 (1) -- reserved position
60-79. 344-443 (100) -- record axis conditions
80. 444 (1) -- reserve position
81. 445-446 (2) -- race
82. 447 (1) -- bridged race flag
83. 448 (1) -- race imputation flag
84. 449 (1) -- race recode (3 categories)
85. 450 (1) -- race recode (5 categories)
86. 461-483 (33) -- reserved positions
87. 484-486 (3) -- Hispanic origin
88. 487 (1) -- reserved
89. 488 (1) -- Hispanic origin/race recode
'''

record_split = re\
.compile(
r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' +
r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' +
r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' +
r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' +
r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
try:
rs = np.array(record_split.split(row))[selected_indices]
except:
rs = np.array(['-99'] * len(selected_indices))
return rs
# return record_split.split(row)
1
2
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv.map(lambda row: row).take(1)
[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'],
       dtype='<U40')]

转换

\(map\)转换

\(map\)方法应用在每个RDD元素上。

1
2
data_2014 = data_from_file_conv.map(lambda row: int(row[16]))
data_2014.take(10)
[2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]
1
2
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
data_2014_2.take(10)
[('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('-99', -99)]

\(filter\)转换

该方法可以从数据集中选择元素,并符合特定的标准。例如在上面的例子中统计2014年死于车祸的人数

1
2
3
data_filtered = data_from_file_conv.filter(
lambda row: row[16]=='2014' and row[21]=='0')
data_filtered.count()
22

\(flatMap\)转换

\(map\)方法类似,但是返回一个扁平的结果,而不是一个列表,而且可以用于过滤一些格式不正确的记录。通过传递一个空列表,可以丢弃格式不正确的记录。

1
2
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16],int(row[16])+1))
data_2014_flat.take(10)
['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

\(distinct\)转换

用于选择指定列中不同的值,并以列表的形式返回。例如检查上面那个数据集中是否只包含男性和女性,从而检查数据集解析是否正确。

1
2
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct()
distinct_gender.collect()
['-99', 'M', 'F']

\(sample\)转换

该方法返回数据集的随机样本。第一个参数表示采样是否应该被替换,第二个参数表示随机返回的比例,第三个参数表示伪随机数产生器的种子。

1
2
3
data_sample = data_from_file_conv.sample(False, 0.1, 666)
print("Original dataset: {0}, sample: {1}"\
.format(data_from_file_conv.count(), data_sample.count()))
Original dataset: 2631171, sample: 263247

\(leftOuterJoin\)转换

根据两个数据集中都有的值来连接两个RDD,并返回左侧的RDD记录,而右侧的记录附在两个RDD匹配的地方。需要注意的是,这是一个高开销的方法,应该谨慎使用。

1
2
3
4
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c', 10)])
rdd2 = sc.parallelize([('a', 4), ('a',1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.take(5)
[('a', (1, 4)), ('a', (1, 1)), ('c', (10, None)), ('b', (4, '6'))]
1
2
rdd4 = rdd1.join(rdd2)
rdd4.take(5)
[('a', (1, 4)), ('a', (1, 1)), ('b', (4, '6'))]
1
2
rdd5 = rdd1.intersection(rdd2)
rdd5.take(5)
[('a', 1)]

\(repartition\)转换

重新对数据集进行分区,改变数据集分区的数量。同样的,因为它重组了数据,因此开销很大。

1
2
rdd1 = rdd1.repartition(4)
len(rdd1.glom().collect())
4

操作

和转换不同,操作执行数据集上的计划任务。

\(take\)方法

这可能是最有用的方法,该方法只返回单个数据分区的前n行,对比之下,\(collect\)方法返回的是整个RDD。如果想要一些随机记录,可以使用\(takeSample\).

1
data_first = data_from_file_conv.take(1)
1
data_take_smapled = data_from_file_conv.takeSample(False, 1, 667)

\(collect\)方法

将所有RDD的元素返回给驱动程序。

\(reduce\)方法

使用指定的方法减少RDD中的元素。例如计算总的元素数量。需要注意的是,传递给\(reduce\)的函数需要在元素顺序改变和操作符顺序改变的情况下都不改变结果

1
2
3
data_reduce = sc.parallelize([1, 2, 0.5, 0.1, 5, 0.2], 1)
works = data_reduce.reduce(lambda x, y: x+y)
works
8.799999999999999
1
2
3
data_reduce = sc.parallelize([1, 2, 0.5, 0.1, 5, 0.2], 1)
divs = data_reduce.reduce(lambda x, y: x/y)
divs
10.0
1
2
3
data_reduce = sc.parallelize([1, 2, 0.5, 0.1, 5, 0.2], 4)
divs = data_reduce.reduce(lambda x, y: x/y)
divs
0.1

\(reduceByKey\)\(reduce\)方法类似,不同在于它是在键-键匹配的基础上进行操作。

1
2
3
4
data_key = sc.parallelize(
[('a', 4), ('a',1), ('b', 6), ('d', 15),
('a', 5), ('d', 1)],4)
data_key.reduceByKey(lambda x, y: x+y).collect()
[('b', 6), ('a', 10), ('d', 16)]

\(count\)方法

统计RDD中元素的总数

1
data_reduce.count()
6

该方法和下面的方法产生同样的结果,但是不需要把整个数据集移动到驱动程序

1
len(data_reduce.collect())
6

如果数据是key-value形式,可以使用\(countByKey\)方法获取不同键的数量

1
data_key.countByKey().items()
dict_items([('a', 3), ('b', 1), ('d', 2)])

\(saveAsTextFile\)方法

将RDD保存为文本文件,每个文件一个分区

1
data_key.saveAsTextFile('/media/seisinv/Data/course/spark/notebook/data_key.txt')

要读取这个文件,需要解析它,因为所有的行都被视为字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
def parseInput(row):
import re

pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
row_split = pattern.split(row)

return (row_split[0], int(row_split[1]))

data_key_reread = sc \
.textFile('/media/seisinv/Data/course/spark/notebook/data_key.txt') \
.map(parseInput)
#data_key_reread.collect()
#data_key_reread.take(5)

\(foreach\)方法

对RDD的每个元素,用迭代的方法应用相同的函数,和\(map\)比较,该方法按照一个接一个的方式处理,当需要将数据保存到pyspark不支持的数据库时,该方法很有用。

1
2
3
def f(x):
print(x)
data_key.foreach(f)

小结

  1. RDD是Spark的核心,是最基本的数据结构
  2. Spark中的转换是惰性的,只有在调用操作之后才真正起作用
  3. Python的RDD比Scala慢很多,因此引入下一种数据结构:DataFrame

参考资料

  • PySpark实战指南,Drabas and Lee著, 栾云杰等译