PySpark+MLlib简介

本文介绍如何使用PySpark中的MLlib工具包进行数据加载、数据转换、特征分析、特征提取以及运行逻辑回归、随机森林算法,并介绍这些功能在预测婴儿存活率中的应用。

加载和转换数据

定义数据集的schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import pyspark.sql.types as typ

labels = [
('INFANT_ALIVE_AT_REPORT', typ.StringType()),
('BIRTH_YEAR', typ.IntegerType()),
('BIRTH_MONTH', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('MOTHER_RACE_6CODE', typ.StringType()),
('MOTHER_EDUCATION', typ.StringType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('FATHER_EDUCATION', typ.StringType()),
('MONTH_PRECARE_RECODE', typ.StringType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_BMI_RECODE', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.StringType()),
('DIABETES_GEST', typ.StringType()),
('HYP_TENS_PRE', typ.StringType()),
('HYP_TENS_GEST', typ.StringType()),
('PREV_BIRTH_PRETERM', typ.StringType()),
('NO_RISK', typ.StringType()),
('NO_INFECTIONS_REPORTED', typ.StringType()),
('LABOR_IND', typ.StringType()),
('LABOR_AUGM', typ.StringType()),
('STEROIDS', typ.StringType()),
('ANTIBIOTICS', typ.StringType()),
('ANESTHESIA', typ.StringType()),
('DELIV_METHOD_RECODE_COMB', typ.StringType()),
('ATTENDANT_BIRTH', typ.StringType()),
('APGAR_5', typ.IntegerType()),
('APGAR_5_RECODE', typ.StringType()),
('APGAR_10', typ.IntegerType()),
('APGAR_10_RECODE', typ.StringType()),
('INFANT_SEX', typ.StringType()),
('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
('INFANT_ASSIST_VENTI', typ.StringType()),
('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
('INFANT_NICU_ADMISSION', typ.StringType()),
('INFANT_SURFACANT', typ.StringType()),
('INFANT_ANTIBIOTICS', typ.StringType()),
('INFANT_SEIZURES', typ.StringType()),
('INFANT_NO_ABNORMALITIES', typ.StringType()),
('INFANT_ANCEPHALY', typ.StringType()),
('INFANT_MENINGOMYELOCELE', typ.StringType()),
('INFANT_LIMB_REDUCTION', typ.StringType()),
('INFANT_DOWN_SYNDROME', typ.StringType()),
('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
('INFANT_BREASTFED', typ.StringType())
]

schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])

加载数据

1
2
3
births = spark.read.csv('births_train.csv.gz',
header=True,
schema=schema)

定义重编码字典

1
2
3
4
5
6
7
recode_dictionary = {
'YNU': {
'Y': 1,
'N': 0,
'U': 0
}
}

选择和出生率相关的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
selected_features = [
'INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_BEFORE',
'CIG_1_TRI',
'CIG_2_TRI',
'CIG_3_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT',
'MOTHER_WEIGHT_GAIN',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM'
]

births_trimmed = births.select(selected_features)

两种编码:
- Yes/No/Unknown分别编码为1/0/0;
- 吸烟数量的编码,0:母亲在怀孕前或者怀孕期间没有吸烟;1-97:母亲实际吸烟数;98:母亲实际吸烟数量是98或者更多;99:实际吸烟数量未知。将未知状态编码为0。

1
2
3
4
5
6
7
8
9
10
11
12
import pyspark.sql.functions as func

def recode(col, key):
return recode_dictionary[key][col]

def correct_cig(feat):
return func \
.when(func.col(feat) != 99, func.col(feat)) \
.otherwise(0)

# udf将函数转化为spark可以理解的函数,第二个参数为返回类型
rec_integer = func.udf(recode, typ.IntegerType())

更正与吸烟数量相关的特征

1
2
3
4
5
6
# withColumn第一个参数为列名,第二个参数为转换函数
births_transformed = births_trimmed \
.withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE')) \
.withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI')) \
.withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI')) \
.withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

更正Y/N/U特征,首先提取出含Y的特征。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cols = [(col.name, col.dataType) for col in births_trimmed.schema]

YNU_cols = []

for i, s in enumerate(cols):
if s[1] == typ.StringType():
dis = births.select(s[0]) \
.distinct() \
.rdd \
.map(lambda row: row[0]) \
.collect()

if 'Y' in dis:
YNU_cols.append(s[0])

print(YNU_cols)
['INFANT_ALIVE_AT_REPORT', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM']

批量转换特征,并重命名转换后的特征

1
2
3
4
5
6
7
births.select([
'INFANT_NICU_ADMISSION',
rec_integer(
'INFANT_NICU_ADMISSION', func.lit('YNU')
) \
.alias('INFANT_NICU_ADMISSION_RECODE')
]).take(5)
[Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
 Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
 Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0),
 Row(INFANT_NICU_ADMISSION='N', INFANT_NICU_ADMISSION_RECODE=0),
 Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0)]

一次性转换所有在YNU_cols中的特征

1
2
3
4
5
6
7
8
9
10
exprs_YNU = [
rec_integer(x, func.lit('YNU')).alias(x)
if x in YNU_cols
else x
for x in births_transformed.columns
]

births_transformed = births_transformed.select(exprs_YNU)

births_transformed.select(YNU_cols[-5:]).show(5)
+------------+-------------+------------+-------------+------------------+
|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+------------+-------------+------------+-------------+------------------+
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 1|
|           0|            0|           0|            0|                 0|
+------------+-------------+------------+-------------+------------------+
only showing top 5 rows

了解数据

描述性统计特征

对于连续值属性,统计一些属性的平均值和方差

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import pyspark.mllib.stat as st
import numpy as np

numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
]

numeric_rdd = births_transformed\
.select(numeric_cols)\
.rdd \
.map(lambda row: [e for e in row])

mllib_stats = st.Statistics.colStats(numeric_rdd)

for col, m, v in zip(numeric_cols,
mllib_stats.mean(),
mllib_stats.variance()):
print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v)))
MOTHER_AGE_YEARS:   28.30    6.08
FATHER_COMBINED_AGE:    44.55    27.55
CIG_BEFORE:     1.43     5.18
CIG_1_TRI:  0.91     3.83
CIG_2_TRI:  0.70     3.31
CIG_3_TRI:  0.58     3.11
MOTHER_HEIGHT_IN:   65.12    6.45
MOTHER_PRE_WEIGHT:  214.50   210.21
MOTHER_DELIVERY_WEIGHT:     223.63   180.01
MOTHER_WEIGHT_GAIN:     30.74    26.23

从上面的结果可以看出:
- 父亲的平均年龄大于母亲的平均年龄
- 许多母亲怀孕后开始戒烟

对于分类型属性,统计其出现的频率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
categorical_cols = [e for e in births_transformed.columns 
if e not in numeric_cols]

categorical_rdd = births_transformed\
.select(categorical_cols)\
.rdd \
.map(lambda row: [e for e in row])

for i, col in enumerate(categorical_cols):
agg = categorical_rdd \
.groupBy(lambda row: row[i]) \
.map(lambda row: (row[0], len(row[1])))

print(col, sorted(agg.collect(),
key=lambda el: el[1],
reverse=True))
INFANT_ALIVE_AT_REPORT [(1, 23349), (0, 22080)]
BIRTH_PLACE [('1', 44558), ('4', 327), ('3', 224), ('2', 136), ('7', 91), ('5', 74), ('6', 11), ('9', 8)]
DIABETES_PRE [(0, 44881), (1, 548)]
DIABETES_GEST [(0, 43451), (1, 1978)]
HYP_TENS_PRE [(0, 44348), (1, 1081)]
HYP_TENS_GEST [(0, 43302), (1, 2127)]
PREV_BIRTH_PRETERM [(0, 43088), (1, 2341)]

从上面可以看出:
- 大部分出生在医院里(BIRTH_PLACE=1)

数值型特征的相关性

1
2
3
4
5
6
7
8
9
10
11
12
corrs = st.Statistics.corr(numeric_rdd)

for i, el in enumerate(corrs > 0.5):
correlated = [
(numeric_cols[j], corrs[i][j])
for j, e in enumerate(el)
if e == 1.0 and j != i]

if len(correlated) > 0:
for e in correlated:
print('{0}-to-{1}: {2:.2f}' \
.format(numeric_cols[i], e[0], e[1]))
CIG_BEFORE-to-CIG_1_TRI: 0.83
CIG_BEFORE-to-CIG_2_TRI: 0.72
CIG_BEFORE-to-CIG_3_TRI: 0.62
CIG_1_TRI-to-CIG_BEFORE: 0.83
CIG_1_TRI-to-CIG_2_TRI: 0.87
CIG_1_TRI-to-CIG_3_TRI: 0.76
CIG_2_TRI-to-CIG_BEFORE: 0.72
CIG_2_TRI-to-CIG_1_TRI: 0.87
CIG_2_TRI-to-CIG_3_TRI: 0.89
CIG_3_TRI-to-CIG_BEFORE: 0.62
CIG_3_TRI-to-CIG_1_TRI: 0.76
CIG_3_TRI-to-CIG_2_TRI: 0.89
MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT: 0.54
MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.65
MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT: 0.54
MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.60
MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT: 0.65
MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT: 0.60
1
print(corrs.shape)
(10, 10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
features_to_keep = [
'INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_1_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select([e for e in features_to_keep])

从上面的结果可以看出:
- \(CIG\_...\)这些特征高度相关,只保留\(CIG\_1\_TRI\)
- 重量特征高度相关,只保留\(MOTHER\_PRE\_WEIGHT\)

分类型特征的相关性(卡方测试)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import pyspark.mllib.linalg as ln

for cat in categorical_cols[1:]:
agg = births_transformed \
.groupby('INFANT_ALIVE_AT_REPORT') \
.pivot(cat) \
.count()

agg_rdd = agg \
.rdd\
.map(lambda row: (row[1:])) \
.flatMap(lambda row:
[0 if e == None else e for e in row]) \
.collect()

row_length = len(agg.collect()[0]) - 1
agg = ln.Matrices.dense(row_length, 2, agg_rdd)

test = st.Statistics.chiSqTest(agg)
print(cat, round(test.pValue, 4))
BIRTH_PLACE 0.0
DIABETES_PRE 0.0
DIABETES_GEST 0.0
HYP_TENS_PRE 0.0
HYP_TENS_GEST 0.0
PREV_BIRTH_PRETERM 0.0

创建最终的数据集

创建\(LabeledPoint\)形式的\(RDD\)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg

hashing = ft.HashingTF(7)

births_hashed = births_transformed \
.rdd \
.map(lambda row: [
list(hashing.transform(row[1]).toArray())
if col == 'BIRTH_PLACE'
else row[i]
for i, col
in enumerate(features_to_keep)]) \
.map(lambda row: [[e] if type(e) == int else e
for e in row]) \
.map(lambda row: [item for sublist in row
for item in sublist]) \
.map(lambda row: reg.LabeledPoint(
row[0],
ln.Vectors.dense(row[1:]))
)

训练数据和测试数据

1
births_train, births_test = births_hashed.randomSplit([0.6, 0.4])

预测婴儿生存机会

逻辑回归法

训练

1
2
3
4
5
from pyspark.mllib.classification \
import LogisticRegressionWithLBFGS

LR_Model = LogisticRegressionWithLBFGS \
.train(births_train, iterations=10)

测试

1
2
3
4
5
6
LR_results = (
births_test.map(lambda row: row.label) \
.zip(LR_Model \
.predict(births_test\
.map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0))

评估测试结果

1
2
3
4
5
6
7
8
import pyspark.mllib.evaluation as ev
LR_evaluation = ev.BinaryClassificationMetrics(LR_results)

print('Area under PR: {0:.2f}' \
.format(LR_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
.format(LR_evaluation.areaUnderROC))
LR_evaluation.unpersist()
Area under PR: 0.85
Area under ROC: 0.63

选择预测性最强的特征

使用卡方测试选择预测性最强的特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
selector = ft.ChiSqSelector(4).fit(births_train)

topFeatures_train = (
births_train.map(lambda row: row.label) \
.zip(selector \
.transform(births_train \
.map(lambda row: row.features)))
).map(lambda row: reg.LabeledPoint(row[0], row[1]))

topFeatures_test = (
births_test.map(lambda row: row.label) \
.zip(selector \
.transform(births_test \
.map(lambda row: row.features)))
).map(lambda row: reg.LabeledPoint(row[0], row[1]))

使用优选的特征采用逻辑回归法预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
LR_Model_2 = LogisticRegressionWithLBFGS \
.train(topFeatures_train, iterations=10)

LR_results_2 = (
topFeatures_test.map(lambda row: row.label) \
.zip(LR_Model_2 \
.predict(topFeatures_test \
.map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0))

LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2)

print('Area under PR: {0:.2f}' \
.format(LR_evaluation_2.areaUnderPR))
print('Area under ROC: {0:.2f}' \
.format(LR_evaluation_2.areaUnderROC))
LR_evaluation_2.unpersist()
Area under PR: 0.88
Area under ROC: 0.61

随机森林法

训练

1
2
3
4
5
6
7
8
9
from pyspark.mllib.tree import RandomForest

RF_model = RandomForest \
.trainClassifier(data=topFeatures_train,
numClasses=2,
categoricalFeaturesInfo={},
numTrees=6,
featureSubsetStrategy='all',
seed=666)

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
RF_results = (
topFeatures_test.map(lambda row: row.label) \
.zip(RF_model \
.predict(topFeatures_test \
.map(lambda row: row.features)))
)

RF_evaluation = ev.BinaryClassificationMetrics(RF_results)

print('Area under PR: {0:.2f}' \
.format(RF_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
.format(RF_evaluation.areaUnderROC))
RF_evaluation.unpersist()
Area under PR: 0.88
Area under ROC: 0.62

小结

  • 本文介绍了使用spark+MLlib进行数据加载、转换、清洗、机器学算法,以及在预测婴儿的生存机会方面的应用
  • MLlib只支持RDD数据集,效率比较低,另一个支持DataFrame的机器学习软件包是ML

参考资料

  • PySpark实战指南,Drabas and Lee著, 栾云杰等译