PySpark+ML简介

本文从预测婴儿生存几率这个例子出发,介绍PySpark中ML包在数据加载、数据转换、特征提取、机器学习算法等方面的功能。并介绍了逻辑回归、聚类、自然语言处理、主题提取等方面的应用。

预测婴儿生存几率

数据加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import pyspark.sql.types as typ

labels = [
('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())
]

schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv('births_transformed.csv.gz',
header=True,
schema=schema)

数据转换

数据类型转换

1
2
3
4
5
6
import pyspark.ml.feature as ft

births = births \
.withColumn( 'BIRTH_PLACE_INT',
births['BIRTH_PLACE'] \
.cast(typ.IntegerType()))

one-hot编码

1
2
3
encoder = ft.OneHotEncoder(
inputCol='BIRTH_PLACE_INT',
outputCol='BIRTH_PLACE_VEC')

将所有特征合并成一列

1
2
3
4
5
6
7
8
featuresCreator = ft.VectorAssembler(
inputCols=[
col[0]
for col
in labels[2:]] + \
[encoder.getOutputCol()],
outputCol='features'
)

建立评估模型

逻辑回归模型

1
2
3
4
5
6
import pyspark.ml.classification as cl

logistic = cl.LogisticRegression(
maxIter=10,
regParam=0.01,
labelCol='INFANT_ALIVE_AT_REPORT')

创建管道

1
2
3
4
5
6
7
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
encoder,
featuresCreator,
logistic
])

模型拟合

数据分割

1
2
births_train, births_test = births \
.randomSplit([0.7, 0.3], seed=666)

训练过程

1
2
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
1
测试过程
1
test_model.take(1)
[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0573, -1.0573]), probability=DenseVector([0.7422, 0.2578]), prediction=0.0)]

模型评估

1
2
3
4
5
6
7
8
9
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))
0.7401301847095617
0.7139354342365674

保存模型

保存管道定义

1
2
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
1
2
3
4
5
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
.fit(births_train)\
.transform(births_test)\
.take(1)
[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0573, -1.0573]), probability=DenseVector([0.7422, 0.2578]), prediction=0.0)]

保存模型

1
2
3
4
5
6
7
8
from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)
test_loadedModel.take(1)
[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0573, -1.0573]), probability=DenseVector([0.7422, 0.2578]), prediction=0.0)]

超参调优

网格搜索

第一步,定义搜索范围

1
2
3
4
5
6
7
8
9
10
11
import pyspark.ml.tuning as tune

logistic = cl.LogisticRegression(
labelCol='INFANT_ALIVE_AT_REPORT')

grid = tune.ParamGridBuilder() \
.addGrid(logistic.maxIter,
[2, 10, 50]) \
.addGrid(logistic.regParam,
[0.01, 0.05, 0.3]) \
.build()

第二步,建立评估模型

1
2
3
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')

第三步,设置验证参数

1
2
3
4
5
cv = tune.CrossValidator(
estimator=logistic,
estimatorParamMaps=grid,
evaluator=evaluator
)
1
第四步,建立转换管道
1
2
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)

第五步,寻找最优超参集

1
cvModel = cv.fit(data_transformer.transform(births_train))
1
2
3
4
5
6
7
8
data_train = data_transformer \
.transform(births_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results,
{evaluator.metricName: 'areaUnderPR'}))
0.7404959803309813
0.7157971108486731

提取对应的超参

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
results = [
(
[
{key.name: paramValue}
for key, paramValue
in zip(
params.keys(),
params.values())
], metric
)
for params, metric
in zip(
cvModel.getEstimatorParamMaps(),
cvModel.avgMetrics
)
]

sorted(results,
key=lambda el: el[1],
reverse=True)[0]
([{'maxIter': 50}, {'regParam': 0.01}], 0.7386350804981119)

训练-验证集划分

使用卡方验证法选择前五个特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
selector = ft.ChiSqSelector(
numTopFeatures=5,
featuresCol=featuresCreator.getOutputCol(),
outputCol='selectedFeatures',
labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = cl.LogisticRegression(
labelCol='INFANT_ALIVE_AT_REPORT',
featuresCol='selectedFeatures'
)

pipeline = Pipeline(stages=[encoder,featuresCreator,selector])
data_transformer = pipeline.fit(births_train)
1
2
3
4
5
tvs = tune.TrainValidationSplit(
estimator=logistic,
estimatorParamMaps=grid,
evaluator=evaluator
)
1
2
3
4
5
6
7
8
9
10
11
12
13
tvsModel = tvs.fit(
data_transformer \
.transform(births_train)
)

data_train = data_transformer \
.transform(births_test)
results = tvsModel.transform(data_train)

print(evaluator.evaluate(results,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results,
{evaluator.metricName: 'areaUnderPR'}))
0.7294296314442145
0.703775950281647

从上面的结果可以看出,使用较少的特征,模型表现差一些,但是并不明显。

PySpark ML的其他功能

NLP相关的特征提取

以一个简单的数据集为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
text_data = spark.createDataFrame([
['''Machine learning can be applied to a wide variety
of data types, such as vectors, text, images, and
structured data. This API adopts the DataFrame from
Spark SQL in order to support a variety of data types.'''],
['''DataFrame supports many basic and structured types;
see the Spark SQL datatype reference for a list of
supported types. In addition to the types listed in
the Spark SQL guide, DataFrame can use ML Vector types.'''],
['''A DataFrame can be created either implicitly or
explicitly from a regular RDD. See the code examples
below and the Spark SQL programming guide for examples.'''],
['''Columns in a DataFrame are named. The code examples
below use names such as "text," "features," and "label."''']
], ['input'])

将文本划分为单词,并删除符号,改成小写。

1
2
3
4
5
6
7
8
9
10
tokenizer = ft.RegexTokenizer(
inputCol='input',
outputCol='input_arr',
pattern='\s+|[,.\"]')

tok = tokenizer \
.transform(text_data) \
.select('input_arr')

tok.take(1)
[Row(input_arr=['machine', 'learning', 'can', 'be', 'applied', 'to', 'a', 'wide', 'variety', 'of', 'data', 'types', 'such', 'as', 'vectors', 'text', 'images', 'and', 'structured', 'data', 'this', 'api', 'adopts', 'the', 'dataframe', 'from', 'spark', 'sql', 'in', 'order', 'to', 'support', 'a', 'variety', 'of', 'data', 'types'])]

删除无用的单词,如a,be等

1
2
3
4
5
stopwords = ft.StopWordsRemover(
inputCol=tokenizer.getOutputCol(),
outputCol='input_stop')

stopwords.transform(tok).select('input_stop').take(1)
[Row(input_stop=['machine', 'learning', 'applied', 'wide', 'variety', 'data', 'types', 'vectors', 'text', 'images', 'structured', 'data', 'api', 'adopts', 'dataframe', 'spark', 'sql', 'order', 'support', 'variety', 'data', 'types'])]

构建NGram模型和管道

1
2
3
4
5
6
7
8
9
10
11
ngram = ft.NGram(n=2, 
inputCol=stopwords.getOutputCol(),
outputCol="nGrams")

pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])

data_ngram = pipeline \
.fit(text_data) \
.transform(text_data)

data_ngram.select('nGrams').take(1)
[Row(nGrams=['machine learning', 'learning applied', 'applied wide', 'wide variety', 'variety data', 'data types', 'types vectors', 'vectors text', 'text images', 'images structured', 'structured data', 'data api', 'api adopts', 'adopts dataframe', 'dataframe spark', 'spark sql', 'sql order', 'order support', 'support variety', 'variety data', 'data types'])]

离散化连续变量

创建一个简单的数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import numpy as np

x = np.arange(0, 100)
x = x / 100.0 * np.pi * 4
y = x * np.sin(x / 1.764) + 20.1234

schema = typ.StructType([
typ.StructField('continuous_var',
typ.DoubleType(),
False
)
])

data = spark.createDataFrame([[float(e), ] for e in y], schema=schema)

将连续变量离散化为5个类别

1
2
3
4
discretizer = ft.QuantileDiscretizer(
numBuckets=5,
inputCol='continuous_var',
outputCol='discretized')

查看每个类别的平均值

1
2
3
4
5
6
7
data_discretized = discretizer.fit(data).transform(data)

data_discretized \
.groupby('discretized')\
.mean('continuous_var')\
.sort('discretized')\
.collect()
[Row(discretized=0.0, avg(continuous_var)=12.314360733007915),
 Row(discretized=1.0, avg(continuous_var)=16.046244793347466),
 Row(discretized=2.0, avg(continuous_var)=20.25079947835259),
 Row(discretized=3.0, avg(continuous_var)=22.040988218437327),
 Row(discretized=4.0, avg(continuous_var)=24.264824657002865)]

标准化连续特征

1
2
3
vectorizer = ft.VectorAssembler(
inputCols=['continuous_var'],
outputCol= 'continuous_vec')
1
2
3
4
5
6
7
8
9
normalizer = ft.StandardScaler(
inputCol=vectorizer.getOutputCol(),
outputCol='normalized',
withMean=True,
withStd=True
)

pipeline = Pipeline(stages=[vectorizer, normalizer])
data_standardized = pipeline.fit(data).transform(data)

随机森林进行分类

本节采用随机森林算法预测婴儿存活率

1
2
3
4
5
6
7
8
9
import pyspark.sql.functions as func

births = births.withColumn(
'INFANT_ALIVE_AT_REPORT',
func.col('INFANT_ALIVE_AT_REPORT').cast(typ.DoubleType())
)

births_train, births_test = births \
.randomSplit([0.7, 0.3], seed=666)
1
2
3
4
5
6
7
8
9
10
11
12
13
classifier = cl.RandomForestClassifier(
numTrees=5,
maxDepth=5,
labelCol='INFANT_ALIVE_AT_REPORT')

pipeline = Pipeline(
stages=[
encoder,
featuresCreator,
classifier])

model = pipeline.fit(births_train)
test = model.transform(births_test)
1
2
3
4
5
6
evaluator = ev.BinaryClassificationEvaluator(
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test,
{evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test,
{evaluator.metricName: "areaUnderPR"}))
0.7625231306933616
0.7474287997552782

使用一棵树进行分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
classifier = cl.DecisionTreeClassifier(
maxDepth=5,
labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
classifier]
)

model = pipeline.fit(births_train)
test = model.transform(births_test)

evaluator = ev.BinaryClassificationEvaluator(
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test,
{evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test,
{evaluator.metricName: "areaUnderPR"}))
0.7582781726635287
0.7787580540118526

聚类

簇查找

使用k-means方法在数据集中查找数据相似性

1
2
3
4
5
6
7
8
9
10
11
12
import pyspark.ml.clustering as clus

kmeans = clus.KMeans(k = 5,
featuresCol='features')

pipeline = Pipeline(stages=[
encoder,
featuresCreator,
kmeans]
)

model = pipeline.fit(births_train)
1
2
3
4
5
6
7
8
test = model.transform(births_test)

test \
.groupBy('prediction') \
.agg({
'*': 'count',
'MOTHER_HEIGHT_IN': 'avg'
}).collect()
[Row(prediction=1, avg(MOTHER_HEIGHT_IN)=83.91154791154791, count(1)=407),
 Row(prediction=3, avg(MOTHER_HEIGHT_IN)=66.64658634538152, count(1)=249),
 Row(prediction=4, avg(MOTHER_HEIGHT_IN)=64.31597357170618, count(1)=10292),
 Row(prediction=2, avg(MOTHER_HEIGHT_IN)=67.69473684210526, count(1)=475),
 Row(prediction=0, avg(MOTHER_HEIGHT_IN)=64.43472584856397, count(1)=2298)]

主题挖掘

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
text_data = spark.createDataFrame([
['''To make a computer do anything, you have to write a
computer program. To write a computer program, you have
to tell the computer, step by step, exactly what you want
it to do. The computer then "executes" the program,
following each step mechanically, to accomplish the end
goal. When you are telling the computer what to do, you
also get to choose how it's going to do it. That's where
computer algorithms come in. The algorithm is the basic
technique used to get the job done. Let's follow an
example to help get an understanding of the algorithm
concept.'''],
['''Laptop computers use batteries to run while not
connected to mains. When we overcharge or overheat
lithium ion batteries, the materials inside start to
break down and produce bubbles of oxygen, carbon dioxide,
and other gases. Pressure builds up, and the hot battery
swells from a rectangle into a pillow shape. Sometimes
the phone involved will operate afterwards. Other times
it will die. And occasionally—kapow! To see what's
happening inside the battery when it swells, the CLS team
used an x-ray technology called computed tomography.'''],
['''This technology describes a technique where touch
sensors can be placed around any side of a device
allowing for new input sources. The patent also notes
that physical buttons (such as the volume controls) could
be replaced by these embedded touch sensors. In essence
Apple could drop the current buttons and move towards
touch-enabled areas on the device for the existing UI. It
could also open up areas for new UI paradigms, such as
using the back of the smartphone for quick scrolling or
page turning.'''],
['''The National Park Service is a proud protector of
America’s lands. Preserving our land not only safeguards
the natural environment, but it also protects the
stories, cultures, and histories of our ancestors. As we
face the increasingly dire consequences of climate
change, it is imperative that we continue to expand
America’s protected lands under the oversight of the
National Park Service. Doing so combats climate change
and allows all American’s to visit, explore, and learn
from these treasured places for generations to come. It
is critical that President Obama acts swiftly to preserve
land that is at risk of external threats before the end
of his term as it has become blatantly clear that the
next administration will not hold the same value for our
environment over the next four years.'''],
['''The National Park Foundation, the official charitable
partner of the National Park Service, enriches America’s
national parks and programs through the support of
private citizens, park lovers, stewards of nature,
history enthusiasts, and wilderness adventurers.
Chartered by Congress in 1967, the Foundation grew out of
a legacy of park protection that began over a century
ago, when ordinary citizens took action to establish and
protect our national parks. Today, the National Park
Foundation carries on the tradition of early park
advocates, big thinkers, doers and dreamers—from John
Muir and Ansel Adams to President Theodore Roosevelt.'''],
['''Australia has over 500 national parks. Over 28
million hectares of land is designated as national
parkland, accounting for almost four per cent of
Australia's land areas. In addition, a further six per
cent of Australia is protected and includes state
forests, nature parks and conservation reserves.National
parks are usually large areas of land that are protected
because they have unspoilt landscapes and a diverse
number of native plants and animals. This means that
commercial activities such as farming are prohibited and
human activity is strictly monitored.''']
], ['documents'])

和前面NLP的例子类似,首先对文本进行处理

1
2
3
4
5
6
7
8
tokenizer = ft.RegexTokenizer(
inputCol='documents',
outputCol='input_arr',
pattern='\s+|[,.\"]')

stopwords = ft.StopWordsRemover(
inputCol=tokenizer.getOutputCol(),
outputCol='input_stop')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
stringIndexer = ft.CountVectorizer(
inputCol=stopwords.getOutputCol(),
outputCol="input_indexed")

tokenized = stopwords \
.transform(
tokenizer\
.transform(text_data)
)

stringIndexer \
.fit(tokenized)\
.transform(tokenized)\
.select('input_indexed')\
.take(2)
[Row(input_indexed=SparseVector(257, {2: 7.0, 6: 1.0, 7: 3.0, 10: 3.0, 11: 3.0, 19: 1.0, 27: 1.0, 31: 1.0, 32: 2.0, 35: 2.0, 40: 1.0, 51: 1.0, 56: 1.0, 65: 1.0, 66: 1.0, 72: 1.0, 74: 1.0, 77: 1.0, 81: 1.0, 83: 1.0, 96: 1.0, 106: 1.0, 111: 1.0, 123: 1.0, 128: 1.0, 163: 1.0, 173: 1.0, 204: 1.0, 206: 1.0, 210: 1.0, 250: 1.0, 253: 1.0, 256: 1.0})),
 Row(input_indexed=SparseVector(257, {18: 2.0, 19: 1.0, 22: 1.0, 28: 2.0, 30: 2.0, 38: 2.0, 45: 1.0, 46: 1.0, 48: 1.0, 50: 1.0, 59: 1.0, 60: 1.0, 62: 1.0, 68: 1.0, 76: 1.0, 92: 1.0, 100: 1.0, 103: 1.0, 107: 1.0, 108: 1.0, 110: 1.0, 113: 1.0, 121: 1.0, 126: 1.0, 131: 1.0, 140: 1.0, 145: 1.0, 146: 1.0, 147: 1.0, 150: 1.0, 151: 1.0, 160: 1.0, 178: 1.0, 179: 1.0, 186: 1.0, 187: 1.0, 191: 1.0, 193: 1.0, 198: 1.0, 199: 1.0, 202: 1.0, 226: 1.0, 232: 1.0, 240: 1.0, 243: 1.0, 247: 1.0, 252: 1.0}))]

采用LDA(Latent Dirichlet Allocation)模型提取主题

1
clustering = clus.LDA(k=2, optimizer='online', featuresCol=stringIndexer.getOutputCol())
1
2
3
4
5
6
pipeline = Pipeline(stages=[
tokenizer,
stopwords,
stringIndexer,
clustering]
)
1
2
3
4
5
topics = pipeline \
.fit(text_data) \
.transform(text_data)

topics.select('topicDistribution').collect()
[Row(topicDistribution=DenseVector([0.2357, 0.7643])),
 Row(topicDistribution=DenseVector([0.0362, 0.9638])),
 Row(topicDistribution=DenseVector([0.986, 0.014])),
 Row(topicDistribution=DenseVector([0.039, 0.961])),
 Row(topicDistribution=DenseVector([0.3513, 0.6487])),
 Row(topicDistribution=DenseVector([0.9715, 0.0285]))]

回归模型

选择一些特征预测属性MOTHER_WEIGHT_GAIN

1
2
3
4
5
6
7
features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT','DIABETES_PRE',
'DIABETES_GEST','HYP_TENS_PRE',
'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',
'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI',
'CIG_3_TRI'
]

将所有的属性合并为一列,然后优选最重要的6个特征

1
2
3
4
5
6
7
8
9
10
featuresCreator = ft.VectorAssembler(
inputCols=[col for col in features[1:]],
outputCol='features'
)

selector = ft.ChiSqSelector(
numTopFeatures=6,
outputCol="selectedFeatures",
labelCol='MOTHER_WEIGHT_GAIN'
)

采用梯度提升决策树预测增加的体重

1
2
3
4
5
6
import pyspark.ml.regression as reg

regressor = reg.GBTRegressor(
maxIter=15,
maxDepth=3,
labelCol='MOTHER_WEIGHT_GAIN')
1
2
3
4
5
6
pipeline = Pipeline(stages=[
featuresCreator,
selector,
regressor])

weightGain = pipeline.fit(births_train)
1
2
3
4
5
6
7
evaluator = ev.RegressionEvaluator(
predictionCol="prediction",
labelCol='MOTHER_WEIGHT_GAIN')

print(evaluator.evaluate(
weightGain.transform(births_test),
{evaluator.metricName: 'r2'}))
0.48862170400240335

可以看出,这个模型表行并不好,应该是和输入特征有关,如果没有更好的输入特征,该模型很难有更好的表现。

小结

  • spark+ML使用转换器、评估器以及它们在管道中的作用
  • 本文涉及到了属性提取和转换、逻辑回归、聚类和回归等机器学习方面的议题
  • 本文简单介绍了超参优选

参考资料

  • PySpark实战指南,Drabas and Lee著, 栾云杰等译