本文介绍如何使用PySpark中的MLlib工具包进行数据加载、数据转换、特征分析、特征提取以及运行逻辑回归、随机森林算法,并介绍这些功能在预测婴儿存活率中的应用。
加载和转换数据
定义数据集的schema
1 | import pyspark.sql.types as typ |
加载数据
1 | births = spark.read.csv('births_train.csv.gz', |
定义重编码字典
1 | recode_dictionary = { |
选择和出生率相关的属性
1 | selected_features = [ |
两种编码:
- Yes/No/Unknown分别编码为1/0/0;
- 吸烟数量的编码,0:母亲在怀孕前或者怀孕期间没有吸烟;1-97:母亲实际吸烟数;98:母亲实际吸烟数量是98或者更多;99:实际吸烟数量未知。将未知状态编码为0。
1 | import pyspark.sql.functions as func |
更正与吸烟数量相关的特征
1 | # withColumn第一个参数为列名,第二个参数为转换函数 |
更正Y/N/U特征,首先提取出含Y的特征。
1 | cols = [(col.name, col.dataType) for col in births_trimmed.schema] |
['INFANT_ALIVE_AT_REPORT', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM']
批量转换特征,并重命名转换后的特征
1 | births.select([ |
[Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0),
Row(INFANT_NICU_ADMISSION='N', INFANT_NICU_ADMISSION_RECODE=0),
Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0)]
一次性转换所有在YNU_cols中的特征
1 | exprs_YNU = [ |
+------------+-------------+------------+-------------+------------------+
|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+------------+-------------+------------+-------------+------------------+
| 0| 0| 0| 0| 0|
| 0| 0| 0| 0| 0|
| 0| 0| 0| 0| 0|
| 0| 0| 0| 0| 1|
| 0| 0| 0| 0| 0|
+------------+-------------+------------+-------------+------------------+
only showing top 5 rows
了解数据
描述性统计特征
对于连续值属性,统计一些属性的平均值和方差
1 | import pyspark.mllib.stat as st |
MOTHER_AGE_YEARS: 28.30 6.08
FATHER_COMBINED_AGE: 44.55 27.55
CIG_BEFORE: 1.43 5.18
CIG_1_TRI: 0.91 3.83
CIG_2_TRI: 0.70 3.31
CIG_3_TRI: 0.58 3.11
MOTHER_HEIGHT_IN: 65.12 6.45
MOTHER_PRE_WEIGHT: 214.50 210.21
MOTHER_DELIVERY_WEIGHT: 223.63 180.01
MOTHER_WEIGHT_GAIN: 30.74 26.23
从上面的结果可以看出:
- 父亲的平均年龄大于母亲的平均年龄
- 许多母亲怀孕后开始戒烟
对于分类型属性,统计其出现的频率
1 | categorical_cols = [e for e in births_transformed.columns |
INFANT_ALIVE_AT_REPORT [(1, 23349), (0, 22080)]
BIRTH_PLACE [('1', 44558), ('4', 327), ('3', 224), ('2', 136), ('7', 91), ('5', 74), ('6', 11), ('9', 8)]
DIABETES_PRE [(0, 44881), (1, 548)]
DIABETES_GEST [(0, 43451), (1, 1978)]
HYP_TENS_PRE [(0, 44348), (1, 1081)]
HYP_TENS_GEST [(0, 43302), (1, 2127)]
PREV_BIRTH_PRETERM [(0, 43088), (1, 2341)]
从上面可以看出:
- 大部分出生在医院里(BIRTH_PLACE=1)
数值型特征的相关性
1 | corrs = st.Statistics.corr(numeric_rdd) |
CIG_BEFORE-to-CIG_1_TRI: 0.83
CIG_BEFORE-to-CIG_2_TRI: 0.72
CIG_BEFORE-to-CIG_3_TRI: 0.62
CIG_1_TRI-to-CIG_BEFORE: 0.83
CIG_1_TRI-to-CIG_2_TRI: 0.87
CIG_1_TRI-to-CIG_3_TRI: 0.76
CIG_2_TRI-to-CIG_BEFORE: 0.72
CIG_2_TRI-to-CIG_1_TRI: 0.87
CIG_2_TRI-to-CIG_3_TRI: 0.89
CIG_3_TRI-to-CIG_BEFORE: 0.62
CIG_3_TRI-to-CIG_1_TRI: 0.76
CIG_3_TRI-to-CIG_2_TRI: 0.89
MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT: 0.54
MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.65
MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT: 0.54
MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.60
MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT: 0.65
MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT: 0.60
1 | print(corrs.shape) |
(10, 10)
1 | features_to_keep = [ |
从上面的结果可以看出:
- \(CIG\_...\)这些特征高度相关,只保留\(CIG\_1\_TRI\)
- 重量特征高度相关,只保留\(MOTHER\_PRE\_WEIGHT\)
分类型特征的相关性(卡方测试)
1 | import pyspark.mllib.linalg as ln |
BIRTH_PLACE 0.0
DIABETES_PRE 0.0
DIABETES_GEST 0.0
HYP_TENS_PRE 0.0
HYP_TENS_GEST 0.0
PREV_BIRTH_PRETERM 0.0
创建最终的数据集
创建\(LabeledPoint\)形式的\(RDD\)
1 | import pyspark.mllib.feature as ft |
训练数据和测试数据
1 | births_train, births_test = births_hashed.randomSplit([0.6, 0.4]) |
预测婴儿生存机会
逻辑回归法
训练
1 | from pyspark.mllib.classification \ |
测试
1 | LR_results = ( |
评估测试结果
1 | import pyspark.mllib.evaluation as ev |
Area under PR: 0.85
Area under ROC: 0.63
选择预测性最强的特征
使用卡方测试选择预测性最强的特征
1 | selector = ft.ChiSqSelector(4).fit(births_train) |
使用优选的特征采用逻辑回归法预测
1 | LR_Model_2 = LogisticRegressionWithLBFGS \ |
Area under PR: 0.88
Area under ROC: 0.61
随机森林法
训练
1 | from pyspark.mllib.tree import RandomForest |
测试
1 | RF_results = ( |
Area under PR: 0.88
Area under ROC: 0.62
小结
- 本文介绍了使用spark+MLlib进行数据加载、转换、清洗、机器学算法,以及在预测婴儿的生存机会方面的应用
- MLlib只支持RDD数据集,效率比较低,另一个支持DataFrame的机器学习软件包是ML
参考资料
- PySpark实战指南,Drabas and Lee著, 栾云杰等译