在上一文中,主要对Spark MLlib机器学习库使用流程进行了介绍。
从搭建环境开始,然后加载数据,探索数据,直到进行模型的训练与评估,最终进行未知数据的预测,即预测婴儿生存机会
本文则来介绍如何使用ML机器学习库来实战ML!同样使用上一节的数据集来演示ML的构建过程。再次尝试预测婴儿的生存几率。
**Pipeline——**管道工作流
管道链接多个转换器和预测器生成一个机器学习工作流。
管道被指定为一系列阶段,每个阶段是一个转换器或一个预测器。
第一行表示一个Pipleline,包含三个阶段。其中Tokenizer和HashingTF是转换,第三个LogiticRegression逻辑回归是预测。下面一行代表流水线中的数据流,圆柱体表示DataFrame,
(1)对于Raw text文本数据和标签生成DataFrame,然后调用Pipeline的fit接口;
(2)调用Tokenizer的transform接口将文本进行分词,并将分词添加到DataFrame;
(3)pipleline调用LogiticRegression.fit产出一个LogiticRegressionModel。
Parameter:所有的Transformer和Estimator共享一个通用的指定参数的API。
Pipline的使用示例,会在ML构建机器学习的案例中体现出来。
1.加载数据
数据地址:https://pan.baidu.com/s/1RJIAR4em2L2XiQpZhBWOgg
提取码:yw39
from pyspark.ml import Pipeline
import pyspark.ml.classification as cl
import pyspark.ml.evaluation as ev
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.types as typ
labels = [('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINE_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())
]
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv(
'data/births_transformed.csv', header=True, schema=schema)
births.show(3)
2.创建转换器
1、先将births中的BIRTH_PLACE字段类型修改为数值类型
2、然后创建一个转换器 OneHotEncoder可以对数值类型的数据进行编码,从而转化为数值类型
3、创建一个单一的列,将所有的特征聚集到一起 该方法是一个列表(没有包含标签列),包含所有要组成outputCol的列,outputCol表示输出的列的名为'features'。
# 创建转换器
import pyspark.ml.feature as ft
births = births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']\
.cast(typ.IntegerType()))
# birth place使用one-hot编码
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE_INT',
outputCol='BIRTH_PLACE_VEC')
# 创建单一的列将所有特征整合在一起
featuresCreator = ft.VectorAssembler(
inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
outputCol='features'
)
3.创建预测器
这里使用和上节一致的逻辑回归模型LogisticRegression,只不过来自pyspark.ml.classification模块。
在此我们还是先导入依赖,再创建模型。
#需要注意的是,如果数据的标签列的名称为label 则无需指定labelCol,如果featuresCre的输出不为'features',需要使用featuresCre调用getOutputColl()来指明featuresCol
# 预测模型性能
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxIter=10,
regParam=0.01,
featuresCol=featuresCreator.getOutputCol(),
labelCol='INFANT_ALIVE_AT_REPORT')
# 拟合模型
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123)
model = pipeline.fit(birth_train)
test_model = model.transform(birth_test)
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT'
)
4.创建管道
前面创建了两个转换器和一个预测器
现在需要做的事情是将两个转换器和一个预测器连接起来,放入一个管道中。
因此我们需要创建一个管道:依然是导入相应的模块,然后实例化一个管道实例。
代码创建一个管道,并其依次将转换器enco、featuresCre和预测器lr结合了起来
# 创建一个管道
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])
5.训练模型
(在创建预测器的时候已经分好了,这一步要在预测用,可以看上面预测器的代码)
有了转换器、预测器和管道,我们就可以利用数据集进行训练模型了,
但是为了确保模型的说服力,在训练模型之前,需要把数据集拆分为训练集和测试集
其中训练集births_train用来训练模型、测试集births_test用来评估模型
训练模型时,训练集传递给enco转换器,enco转换器输出的DataFrame传递给featuresCre转换器,featuresCre转换器的输出为features列,features列再传递给预测器lr逻辑回归模型。
调用管道模型model对象的transform方法会获得预测值
# 拟合模型
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123)
model = pipeline.fit(birth_train)
test_model = model.transform(birth_test)
6.使用BinaryClassificationEvaluator对模型评估
此步依然使用BinaryClassificationEvaluator对模型评估
这一步骤与MLlib中基本相同。
先导入模型评估的模块、然后实例化BinaryClassificationEvaluator评估器
最后打印相关评估结果。
# 评估模型性能
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT'
)
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))
7.模型保存与调用
包括管道的保存与加载、模型的保存与加载
这一步骤与MLlib中基本相同。
# 保存模型pipeline
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
# 重载模型pipeline
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(birth_train).transform(birth_test).take(1)
# 保存模型
from pyspark.ml import PipelineModel
modelPath = './infant_oneHotEncoder_LogisticPipelineModel'
model.write().overwrite().save(modelPath)
# 载入模型
loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(birth_test)
test_reloadedModel.take(1)
通过本文的学习,大致了解了Spark.ML机器学习库的使用步骤:
包括加载数据、创建转换器和预测器、创建管道、训练模型、模型评估以及模型的保存与调用
ML与上一节MLlib的使用步骤主要区别在于预测器和管道的创建、这是ML特殊的地方。
而且从spark2.x版本开始人们更倾向于使用ML