在上一文中,主要对Spark MLlib机器学习库使用流程进行了介绍。

从搭建环境开始,然后加载数据,探索数据,直到进行模型的训练与评估,最终进行未知数据的预测,即预测婴儿生存机会

image-20200513082303944

本文则来介绍如何使用ML机器学习库来实战ML!同样使用上一节的数据集来演示ML的构建过程。再次尝试预测婴儿的生存几率。

image-20200513081915541

image-20200513081948994

**Pipeline——**管道工作流

管道链接多个转换器和预测器生成一个机器学习工作流。

管道被指定为一系列阶段,每个阶段是一个转换器或一个预测器。

第一行表示一个Pipleline,包含三个阶段。其中Tokenizer和HashingTF是转换,第三个LogiticRegression逻辑回归是预测。下面一行代表流水线中的数据流,圆柱体表示DataFrame,

(1)对于Raw text文本数据和标签生成DataFrame,然后调用Pipeline的fit接口;

(2)调用Tokenizer的transform接口将文本进行分词,并将分词添加到DataFrame;

(3)pipleline调用LogiticRegression.fit产出一个LogiticRegressionModel。

Parameter:所有的Transformer和Estimator共享一个通用的指定参数的API。

Pipline的使用示例,会在ML构建机器学习的案例中体现出来。

1.加载数据

数据地址:https://pan.baidu.com/s/1RJIAR4em2L2XiQpZhBWOgg
提取码:yw39

from pyspark.ml import Pipeline
import pyspark.ml.classification as cl
import pyspark.ml.evaluation as ev
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.types as typ

labels = [('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
          ('BIRTH_PLACE', typ.StringType()),
          ('MOTHER_AGE_YEARS', typ.IntegerType()),
          ('FATHER_COMBINE_AGE', typ.IntegerType()),
          ('CIG_BEFORE', typ.IntegerType()),
          ('CIG_1_TRI', typ.IntegerType()),
          ('CIG_2_TRI', typ.IntegerType()),
          ('CIG_3_TRI', typ.IntegerType()),
          ('MOTHER_HEIGHT_IN', typ.IntegerType()),
          ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
          ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
          ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
          ('DIABETES_PRE', typ.IntegerType()),
          ('DIABETES_GEST', typ.IntegerType()),
          ('HYP_TENS_PRE', typ.IntegerType()),
          ('HYP_TENS_GEST', typ.IntegerType()),
          ('PREV_BIRTH_PRETERM', typ.IntegerType())
          ]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv(
    'data/births_transformed.csv', header=True, schema=schema)
births.show(3)

image-20200512114558911

2.创建转换器

1、先将births中的BIRTH_PLACE字段类型修改为数值类型

2、然后创建一个转换器 OneHotEncoder可以对数值类型的数据进行编码,从而转化为数值类型

3、创建一个单一的列,将所有的特征聚集到一起 该方法是一个列表(没有包含标签列),包含所有要组成outputCol的列,outputCol表示输出的列的名为'features'。

# 创建转换器
import  pyspark.ml.feature as ft

births = births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']\
    .cast(typ.IntegerType()))

# birth place使用one-hot编码
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE_INT',
                           outputCol='BIRTH_PLACE_VEC')

# 创建单一的列将所有特征整合在一起
featuresCreator = ft.VectorAssembler(
    inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
    outputCol='features'
)

3.创建预测器

这里使用和上节一致的逻辑回归模型LogisticRegression,只不过来自pyspark.ml.classification模块。

在此我们还是先导入依赖,再创建模型。

#需要注意的是,如果数据的标签列的名称为label 则无需指定labelCol,如果featuresCre的输出不为'features',需要使用featuresCre调用getOutputColl()来指明featuresCol

# 预测模型性能
import pyspark.ml.classification as cl

logistic = cl.LogisticRegression(maxIter=10,
                                regParam=0.01,
                                featuresCol=featuresCreator.getOutputCol(),
                                labelCol='INFANT_ALIVE_AT_REPORT')
# 拟合模型
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123)
model = pipeline.fit(birth_train)
test_model = model.transform(birth_test)

import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

image-20200512114726407

4.创建管道

前面创建了两个转换器和一个预测器

现在需要做的事情是将两个转换器和一个预测器连接起来,放入一个管道中。

因此我们需要创建一个管道:依然是导入相应的模块,然后实例化一个管道实例。

代码创建一个管道,并其依次将转换器enco、featuresCre和预测器lr结合了起来

# 创建一个管道
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])

5.训练模型

(在创建预测器的时候已经分好了,这一步要在预测用,可以看上面预测器的代码)
有了转换器、预测器和管道,我们就可以利用数据集进行训练模型了,

但是为了确保模型的说服力,在训练模型之前,需要把数据集拆分为训练集和测试集

其中训练集births_train用来训练模型、测试集births_test用来评估模型

训练模型时,训练集传递给enco转换器,enco转换器输出的DataFrame传递给featuresCre转换器,featuresCre转换器的输出为features列,features列再传递给预测器lr逻辑回归模型。

调用管道模型model对象的transform方法会获得预测值

# 拟合模型
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123)

model = pipeline.fit(birth_train)
test_model = model.transform(birth_test)

6.使用BinaryClassificationEvaluator对模型评估

此步依然使用BinaryClassificationEvaluator对模型评估

这一步骤与MLlib中基本相同。

先导入模型评估的模块、然后实例化BinaryClassificationEvaluator评估器

最后打印相关评估结果。

# 评估模型性能
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))

image-20200513083354819

7.模型保存与调用

包括管道的保存与加载、模型的保存与加载

这一步骤与MLlib中基本相同。

# 保存模型pipeline
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

# 重载模型pipeline
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(birth_train).transform(birth_test).take(1)

# 保存模型
from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_LogisticPipelineModel'
model.write().overwrite().save(modelPath)

# 载入模型
loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(birth_test)
test_reloadedModel.take(1)

image-20200512114807432

通过本文的学习,大致了解了Spark.ML机器学习库的使用步骤:

包括加载数据、创建转换器和预测器、创建管道、训练模型、模型评估以及模型的保存与调用

ML与上一节MLlib的使用步骤主要区别在于预测器和管道的创建、这是ML特殊的地方。

而且从spark2.x版本开始人们更倾向于使用ML