pySpark 機(jī)器學(xué)習(xí)庫ml入門

在spark中提供了兩個(gè)機(jī)器學(xué)習(xí)庫mllib和ml,mllib的操作是基于RDD的,而ml則是基于DataFrame,是主流機(jī)器學(xué)習(xí)庫。

1、ml包的概述

ml包包括三個(gè)主要的抽象類:轉(zhuǎn)換器(Transformer)、評估器(Estimator)和管道(Pipeline)。

1.1 轉(zhuǎn)換器

轉(zhuǎn)換器類通過將一個(gè)新列附加到DataFrame來轉(zhuǎn)換數(shù)據(jù)。

從高層次上看,當(dāng)從轉(zhuǎn)換器的抽象類派生時(shí),每個(gè)新的轉(zhuǎn)換器類需要實(shí)現(xiàn).transform()方法。該方法要求傳遞一個(gè)要被轉(zhuǎn)換的DataFrame,該參數(shù)通常是第一個(gè)也是唯一的一個(gè)強(qiáng)制性參數(shù)。這在ml包的不同方法中也不盡相同:其他常見的參數(shù)有inputCol和outputCol;然而,這些參數(shù)通常用一些預(yù)定義的值作為默認(rèn)值,例如inputCol參數(shù)的默認(rèn)值為“features”。

spark.ml.feature中提供了許多轉(zhuǎn)換器,下面做個(gè)簡要介紹:

  • Binarizer, 根據(jù)指定的閾值將連續(xù)變量轉(zhuǎn)換為對應(yīng)的二進(jìn)制值;
  • Bucketizer, 與Binarizer類似,該方法根據(jù)閾值列表(分割的參數(shù)),將連續(xù)變量轉(zhuǎn)換為多項(xiàng)值(即將連續(xù)變量離散到指定的范圍區(qū)間);
  • ChiSqSelector, 對于分類目標(biāo)變量,此功能允許你選擇預(yù)定義數(shù)量的特征(由numTopFeatures參數(shù)指定),以便更好地說明目標(biāo)的變化。如名所示,該方法使用卡方檢驗(yàn)(Chi-Square)完成選擇。該方法需要兩步:首先,需要fit()數(shù)據(jù)。調(diào)用fit方法返回一個(gè)ChipSelectorModel對象,然后使用該對象的transform方法來轉(zhuǎn)換DataFrame;
  • countVectorizer, 該方法對于標(biāo)記文本(如[['learning', 'pyspark', 'with', 'us'], ['us', 'us', 'us', 'us']])是有用的。這是一個(gè)需要兩步的方法:首先,需要用fit從數(shù)據(jù)集中學(xué)習(xí)這些模式,然后才能使用fit方法返回的CountVectorizerModel對象的transform方法。對于如上所示的標(biāo)記文本,該轉(zhuǎn)換器的輸出類似于`[(4, [0, 1, 2, 3], [1.0, 1.0, 1.0, 1.0]), (4, [3], [3.0])]。
  • DCT, 離散余弦變換取實(shí)數(shù)值向量,并返回相同長度的向量,但余弦函數(shù)之和在不同頻率下振蕩。這種轉(zhuǎn)換對于提取數(shù)據(jù)或數(shù)據(jù)壓縮中的一些基本頻率很有用。
  • ElementwiseProduct, 該方法返回一個(gè)向量,其中的元素是傳入該方法的向量和另一個(gè)傳入?yún)?shù)scalingVec向量的乘積。例如,如果傳入的向量是[10.0, 3.0, 15.0], 而傳入的scalingVec為[0.99, 3.30, 0.66], 那么將獲得如下所示的向量:[9.9, 9.9, 9.9, 9.9];
  • HashingTF, 一個(gè)哈希轉(zhuǎn)換器,輸入為標(biāo)記文本的列表,返回一個(gè)帶有技術(shù)的有預(yù)定長度的向量。摘自pyspark文檔:"由于使用簡單的模數(shù)將散列函數(shù)轉(zhuǎn)換為列索引,建議使用2的冪作為numFeatures參數(shù);否則特征將不會均勻的映射到列";
  • IDF, 該方法計(jì)算文檔列表的逆向文件頻率。請注意,文檔需要提前用向量表示(例如,使用HashingTF或CountVectorizer);
  • IndexToString, 與StringIndexer方法對應(yīng)。它使用StringIndexerModel對象中的編碼將字符串索引反轉(zhuǎn)到原始值。另外請注意,如果有時(shí)不起作用,你需要指定StringIndexer中的值;
  • MaxAbsScaler, 將數(shù)據(jù)調(diào)整到[-1,0,1]范圍內(nèi)(因此不會移動數(shù)據(jù)中心);
  • MinMaxScaler, 這與MaxAbsScaler相似,區(qū)別在于它將數(shù)據(jù)縮放到[0.0, 1.0]范圍內(nèi);
  • NGram, 此方法的輸入為標(biāo)記文本的列表,返回結(jié)果包含一系列n-gram:以兩個(gè)詞、三個(gè)詞或更多的n個(gè)詞記為一個(gè)n-gram。例如,如果你有一個(gè)['good', 'morning', 'Robin', 'Williams'], 你會得到以下輸出:['good morning', 'morning Robin', 'Robin Williams'];
  • Normlizer, 該方法使用p范數(shù)將數(shù)據(jù)縮放為單位范數(shù)(默認(rèn)為L2);
  • OneHotEncoder, 該方法將分類列編碼為二進(jìn)制向量列;
  • PCA, 使用主成分分析執(zhí)行數(shù)據(jù)降維;
  • PolynomiaExpansion, 執(zhí)行向量的多項(xiàng)式展開。例如,加入你有一個(gè)如[x,y,z]的向量,則該方法將產(chǎn)生一下擴(kuò)展:[x, xx, y, xy, yy, z, xz, yz, zz];
  • QuantileDiscretizer, 與Bucketizer方法類似,但不是傳遞分隔參數(shù),而是傳遞一個(gè)numBuckets參數(shù)。然后,該方法通過計(jì)算數(shù)據(jù)的近似分位數(shù)來決定分隔應(yīng)該是什么;
  • RegexTokenizer, 這是一個(gè)使用正則表達(dá)式的字符串分詞器;
  • RFormula, 對于狂熱的R用戶,你可以傳遞一個(gè)公式,如vec ~ alpha*3 + beta (假設(shè)你的DataFrame具有alpha和beta列),它將產(chǎn)生給定表達(dá)式的vec列;
  • SQLTransformer, 與上面相似,但不是類似R的公式,你可以使用SQL語法(FROM 語句應(yīng)該從__THIS中選擇,表示你正在訪問DataFrame,如SELECT alpha*3 + beta AS vec FROM THIS);
  • StandardScaler, 標(biāo)準(zhǔn)化列,使其擁有零均值和等于1的標(biāo)準(zhǔn)差;
  • StopWordsRemover, 從標(biāo)記文本中刪除停用詞(如'the', 'a');
  • StringIndexer, 假設(shè)包含所有單詞的列表都在一列,這將產(chǎn)生一個(gè)索引向量;
  • Tokenizer (分詞器):該默認(rèn)分詞器將字符串轉(zhuǎn)成小寫,然后以空格為分隔符分詞;
  • VectorAssembler, 這是一個(gè)非常有用的轉(zhuǎn)換器,他將多個(gè)數(shù)字(包括向量)列合并為一列向量;
  • VectorIndexer, 該方法為類別列生成索引向量。它以逐列方式工作,從列中選擇不同的值,排序并從映射中返回值的索引而不是原始值;
  • VectorSlicer, 用于特征向量,不管是密集的還是稀疏的:給定一個(gè)索引列表,它從特征向量中提取值;
  • Word2Vec, 該方法將一個(gè)句子(字符串)作為輸入 ,并將其轉(zhuǎn)換為{string, vector}格式的映射,這種表示在自然語言處理中非常有用。
1.2 評估器

評估器可以被視為需要評估的統(tǒng)計(jì)模型,對你的觀測對象做預(yù)測或分類。

如果從抽象的評估器類派生,新模型必須實(shí)現(xiàn)fit方法,該方法給出的在DataFrame中找到的數(shù)據(jù)和某些默認(rèn)或自定義的參數(shù)來擬合模型。在pyspark中有很多評估器可用,下面簡要介紹下spark中提供的模型。

分類

ML包提供了七種分類(Classification)模型以供選擇,從最簡單的邏輯回歸到一些更復(fù)雜的模型,下面作簡要的描述:

  • LogisticRegression, 分類的基準(zhǔn)模型。邏輯回歸使用一個(gè)對數(shù)函數(shù)來計(jì)算屬于特定類別的觀察對象的概率;
  • DecisionTreeClassifier, 該分類器構(gòu)建了一個(gè)決策樹來預(yù)測一個(gè)觀察對象的所屬類別。指定maxDepth參數(shù)限制樹的深度,minInstancePerNode確定需要進(jìn)一步拆分的樹節(jié)點(diǎn)的觀察對象的最小數(shù)量,maxBins參數(shù)指定連續(xù)變量將被分割的Bin的最大數(shù)量,而impurity指定用于測量并計(jì)算來自分隔的信息的度量;
  • GBTClassifier, 用于分類的梯度提升決策樹模型。該模型屬于集合模型家族:集合模型結(jié)合多個(gè)弱預(yù)測模型而形成一個(gè)強(qiáng)健的模型;
  • RandomForestClassifier,該模型產(chǎn)生多個(gè)決策樹,并使用模式輸出的決策樹來對觀察對象進(jìn)行分類;
  • NaiveBayes, 基于貝葉斯定理,該模型使用條件概率理論對觀測進(jìn)行分類;
  • MultilayerPerceptronClassfier, 多層感知器分類器。模仿人類大腦本質(zhì)的分類器,深深植根于人造神經(jīng)網(wǎng)絡(luò)理論,該模型是一個(gè)黑盒模型,內(nèi)部參數(shù)不易解釋。該模型至少包含三個(gè)完全相連的人造神經(jīng)元層:輸入層(需要和數(shù)據(jù)集中特征的數(shù)量一樣)、多個(gè)隱藏層(至少一個(gè))以及一個(gè)輸出層,其神經(jīng)元數(shù)量等于標(biāo)簽中的類別數(shù)量。輸入層和隱藏層中的所有神經(jīng)元都有sigmoid激活函數(shù),而輸出神經(jīng)元的激活函數(shù)則為softmax。
  • OneVsRest,將多分類問題簡化為二分類問題。例如,在多標(biāo)簽的情況下,模型可以訓(xùn)練成多個(gè)二元邏輯回歸模型。如多標(biāo)簽情況下,模型可以訓(xùn)練成多個(gè)二元邏輯回歸模型。所有模型分別計(jì)分,具有最高概率的模型獲勝。
回歸

pyspark ML軟件包中有七種可用于回歸(Regression)任務(wù)的模型。與分類一樣,范圍從一些基本的回歸(如強(qiáng)制線性回歸)到更復(fù)雜的回歸:

  • AFTSurvivalRegression,適合加速失效時(shí)間回歸模型。它是一個(gè)參數(shù)化模型,假設(shè)其中一個(gè)特征的邊際效應(yīng)加速或減緩了預(yù)期壽命(或過程失敗)。它非常適用于具有明確階段的過程;
  • DecisionTreeRegressor, 類似于分類模型,明顯不同的是其標(biāo)簽是連續(xù)的而不是二元的;
  • GBTRegressor, 與DecisionTressRegressor一樣,區(qū)別在于標(biāo)簽的數(shù)據(jù)類型;
  • GeneralizedLinearRegression,廣義線性回歸是具有不同內(nèi)核功能(鏈接功能)的線性模型家族。與假設(shè)誤差項(xiàng)的常態(tài)性的線性回歸相反,GLM允許標(biāo)簽具有不同的誤差分布項(xiàng):pyspark ML包的generalizedRegression模型支持gaussian、binomial、gamma、和possion家族的誤差分布,他們有許多不同的連接功能;
  • IsotonicRegression, 這種回歸擬合一個(gè)形式自由的、非遞減的行到數(shù)據(jù)中。對于擬合有序的和遞增的觀測數(shù)據(jù)集是有用的;
  • LinearRgression, 最簡單的回歸模型,他架設(shè)了特征與連續(xù)標(biāo)簽以及誤差項(xiàng)的常態(tài)之間的線性關(guān)系;
  • RandomForestRegressor, 與DecisionTreeRegressor或GBTRegressor類似,RandomForestRegressor適合連續(xù)的標(biāo)簽,而不是離散標(biāo)簽。
聚類

聚類是一系列無監(jiān)督模型,用于查找數(shù)據(jù)中的隱含模式。pyspark ML包提供了四種當(dāng)前最流行的模型:

  • BisectingKMeans, 二分k均值算法,該算法結(jié)合了k均值聚類算法和層次聚類算法。最初該算法將所有觀察點(diǎn)作為一個(gè)簇,然后將數(shù)據(jù)迭代的分解為k個(gè)簇;
  • KMeans, K均值算法,將數(shù)據(jù)分成k個(gè)簇,迭代地搜索那些使每個(gè)觀察點(diǎn)和它所屬簇的質(zhì)點(diǎn)之間距離平方和最小的那些質(zhì)點(diǎn);
  • GaussianMixture, 混合高斯模型。該方法使用具有未知參數(shù)的k個(gè)高斯分布來剖析數(shù)據(jù)集。使用期望最大化算法,通過最大化對數(shù)似然函數(shù)找到高斯參數(shù);
  • LDA, 該模型用于自然語言處理應(yīng)用程序中的主題生成;
  • 除此之外,pyspark ML還提供了推薦模型。
1.3 管道

pyspark ML中管道的概念用來表示從轉(zhuǎn)換到評估(具有一系列不同階段)的端到端的過程,這個(gè)過程可以對輸入的一些原始數(shù)據(jù)(以DataFrame形式)執(zhí)行必要的數(shù)據(jù)加工(轉(zhuǎn)換),最后評估模型。

一個(gè)管道可以被認(rèn)為是由一系列不同階段組成的。在Pipeline對象上執(zhí)行fit方法時(shí),所有階段按照stage參數(shù)中指定的順序執(zhí)行;stage參數(shù)是轉(zhuǎn)換器和評估器對象的列表。管道對象的fit方法執(zhí)行每個(gè)轉(zhuǎn)換器的transform方法和所有評估器的fit方法。

通常,前一階段的輸出會成為下一階段的輸入:當(dāng)從轉(zhuǎn)換器或評估器抽象類型派生時(shí),需要實(shí)現(xiàn)getOutputCol()方法,該方法返回創(chuàng)建對象時(shí)指定的outputCol參數(shù)的值。

下面通過一些例子來詳細(xì)介紹ml的用法。

2、例子:使用ML預(yù)測嬰兒生存幾率

在本節(jié)中我們將使用ml中的函數(shù)方法來預(yù)測嬰兒生存率,數(shù)據(jù)可從http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz下載。

2.1 加載數(shù)據(jù)
import pyspark.sql.types as typ
from pyspark.ml import Pipeline
import pyspark.ml.classification as cl
import pyspark.ml.evaluation as ev
import pandas as pd
import numpy as np
import os

labels = [('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
          ('BIRTH_PLACE', typ.StringType()),
          ('MOTHER_AGE_YEARS', typ.IntegerType()),
          ('FATHER_COMBINE_AGE', typ.IntegerType()),
          ('CIG_BEFORE', typ.IntegerType()),
          ('CIG_1_TRI', typ.IntegerType()),
          ('CIG_2_TRI', typ.IntegerType()),
          ('CIG_3_TRI', typ.IntegerType()),
          ('MOTHER_HEIGHT_IN', typ.IntegerType()),
          ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
          ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
          ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
          ('DIABETES_PRE', typ.IntegerType()),
          ('DIABETES_GEST', typ.IntegerType()),
          ('HYP_TENS_PRE', typ.IntegerType()),
          ('HYP_TENS_GEST', typ.IntegerType()),
          ('PREV_BIRTH_PRETERM', typ.IntegerType())
          ]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv(
    '/Users/shexuan/Downloads/births_transformed.csv.gz', header=True, schema=schema)

births.show(3)
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINE_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|                     0|          1|              29|                99|         0|        0|        0|        0|              99|              999|                   999|                99|           0|            0|           0|            0|                 0|
|                     0|          1|              22|                29|         0|        0|        0|        0|              65|              180|                   198|                18|           0|            0|           0|            0|                 0|
|                     0|          1|              38|                40|         0|        0|        0|        0|              63|              155|                   167|                12|           0|            0|           0|            0|                 0|
+----------------------+-----------+----------------+------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+

在這里我們指定DataFrame的schema,限制數(shù)據(jù)集只有17列。

2.2 創(chuàng)建轉(zhuǎn)換器

在使用模型對數(shù)據(jù)集進(jìn)行評估預(yù)測前,先要對數(shù)據(jù)做一些特征轉(zhuǎn)換。

# 創(chuàng)建轉(zhuǎn)換器、評估器
import  pyspark.ml.feature as ft

births = births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']\
    .cast(typ.IntegerType()))

# birth place使用one-hot編碼
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE_INT',
                           outputCol='BIRTH_PLACE_VEC')

# 創(chuàng)建單一的列將所有特征整合在一起
featuresCreator = ft.VectorAssembler(
    inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
    outputCol='features'
)

# 創(chuàng)建一個(gè)評估器
import pyspark.ml.classification as cl

logistic = cl.LogisticRegression(maxIter=10,
                                regParam=0.01,
                                featuresCol=featuresCreator.getOutputCol(),
                                labelCol='INFANT_ALIVE_AT_REPORT')
2.3 創(chuàng)建一個(gè)管道、擬合模型

在前面我們已經(jīng)創(chuàng)建了數(shù)據(jù)轉(zhuǎn)換器和評估器,現(xiàn)在我們可以通過管道將他們串聯(lián)起來并方便的進(jìn)行模型擬合了。

# 創(chuàng)建一個(gè)管道
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])

# 擬合模型
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123)

model = pipeline.fit(birth_train)
test_model = model.transform(birth_test)
2.4 評估模型

在前面我們將數(shù)據(jù)分為了兩部分并通過管道方便的對訓(xùn)練集進(jìn)行了擬合以及對測試集進(jìn)行了測試。現(xiàn)在我們可以通過測試集的結(jié)果來對模型擬合效果進(jìn)行評估了。

# 評估模型性能
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))

0.7187355793173213
0.6819691176245866
2.5 保存模型

PySpark不僅允許保存訓(xùn)練好的模型,還可以保存管道結(jié)構(gòu)及所有轉(zhuǎn)換器和評估器的定義。

# 保存模型pipeline
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

# 重載模型pipeline
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(birth_train).transform(birth_test).take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', ...]

# 保存模型
from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_LogisticPipelineModel'
model.write().overwrite().save(modelPath)

# 載入模型
loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(birth_test)
test_reloadedModel.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, ...]
2.6 超參調(diào)優(yōu)

我們的第一個(gè)模型幾乎不可能是最好的模型。利用超參調(diào)優(yōu)能幫我們找到模型的最佳參數(shù),如邏輯回歸模型所需的最佳迭代次數(shù)或決策樹的最大深度。

在超參調(diào)優(yōu)時(shí)PySpark提供了兩種驗(yàn)證方法:K-Fold交叉驗(yàn)證和train-validation(相當(dāng)于1-Fold)交叉驗(yàn)證。

# 超參調(diào)優(yōu):grid search和train-validation splitting 

# 網(wǎng)格搜索
import pyspark.ml.tuning as tune

logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')
grid = tune.ParamGridBuilder()\
    .addGrid(logistic.maxIter, [5,10,50])\
    .addGrid(logistic.regParam, [0.01,0.05,0.3])\
    .build()

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

# 使用K-Fold交叉驗(yàn)證評估各種參數(shù)的模型
cv = tune.CrossValidator(
    estimator=logistic,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3
)

# 創(chuàng)建一個(gè)構(gòu)建特征的pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator])
birth_train, birth_test = births.randomSplit([0.7,0.3],seed=123) # 重新打開數(shù)據(jù)進(jìn)行處理
data_transformer = pipeline.fit(birth_train)
data_test = data_transformer.transform(birth_test)


# cvModel 返回估計(jì)的最佳模型
cvModel = cv.fit(data_transformer.transform(birth_train))
results = cvModel.transform(data_test)

print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

0.735848884034915
0.6959036715961695

使用下面的代碼可以查看模型最佳參數(shù):

# 查看最佳模型參數(shù)
param_maps = cvModel.getEstimatorParamMaps()
eval_metrics = cvModel.avgMetrics

param_res = []

for params, metric in zip(param_maps, eval_metrics):
    param_metric = {}
    for key, param_val in zip(params.keys(), params.values()):
        param_metric[key.name]=param_val
    param_res.append((param_metric, metric))

sorted(param_res, key=lambda x:x[1], reverse=True)

[({'maxIter': 50, 'regParam': 0.01}, 0.7406291618177623),
 ({'maxIter': 10, 'regParam': 0.01}, 0.735580969909943),
 ({'maxIter': 50, 'regParam': 0.05}, 0.7355100622938429),
 ({'maxIter': 10, 'regParam': 0.05}, 0.7351586303619441),
 ({'maxIter': 10, 'regParam': 0.3}, 0.7248698034708339),
 ({'maxIter': 50, 'regParam': 0.3}, 0.7214679272915997),
 ({'maxIter': 5, 'regParam': 0.3}, 0.7180255703028883),
 ({'maxIter': 5, 'regParam': 0.01}, 0.7179304617840288),
 ({'maxIter': 5, 'regParam': 0.05}, 0.7173397593133481)]

上面使用的使用K-Fold來進(jìn)行超參調(diào)優(yōu),K-Fold交叉驗(yàn)證往往非常耗時(shí),使用1-Fold的交叉驗(yàn)證(即將數(shù)據(jù)集按比例分為訓(xùn)練集合驗(yàn)證集)能大大縮短時(shí)間。

# Train-validation劃分

# 使用卡方檢驗(yàn)選擇特征
selector = ft.ChiSqSelector(
    numTopFeatures=5,
    featuresCol=featuresCreator.getOutputCol(),
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT',
                                featuresCol='selectedFeatures')

pipeline = Pipeline(stages=[encoder, featuresCreator, selector])
data_transformer = pipeline.fit(birth_train)

tvs = tune.TrainValidationSplit(estimator=logistic,
                               estimatorParamMaps=grid,
                               evaluator=evaluator,
                                trainRatio=0.75
                               )

tvsModel = tvs.fit(data_transformer.transform(birth_train))
data_test = data_transformer.transform(birth_test)
results = tvsModel.transform(data_test)

print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'}))

0.6111344483529891
0.5735913338089571

3、使用PySpark ML的其他功能

在上面我們完整的介紹了利用pyspark ml庫來進(jìn)行建模的過程。下面我們介紹一些其他常用的功能。

3.1 特征提取
3.1.1 NLP相關(guān)特征提取

如第一部分所述,NGram模型采用標(biāo)記文本的列表,并生成單詞對(或n-gram)。
本例中,我們從pyspark的文檔中摘錄一段,并介紹如何在將文本傳遞給NGram模型之前進(jìn)行清理。

# NLP相關(guān)特征提?。∟Gram模型采用標(biāo)記文本的列表,并生成單詞對或n-gram)

text_data = spark.createDataFrame([
    ['''K-fold cross validation performs model selection by splitting the dataset into a set of non-overlapping 
    randomly partitioned folds which are used as separate training and test datasets e.g., with k=3 folds, 
    K-fold cross validation will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data 
    for training and 1/3 for testing. Each fold is used as the test set exactly once.'''],
    ['''CrossValidatorModel contains the model with the highest average cross-validation metric across folds and
    uses this model to transform input data. CrossValidatorModel also tracks the metrics for each param map 
    evaluated.'''],
    ['''Creates a copy of this instance with a randomly generated uid and some extra params. This copies the 
    underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra 
    parameters over.''']
], ['input'])

# 將文本拆分成單詞
tokenizer = ft.RegexTokenizer(inputCol='input',
                              outputCol='input_arr',
                              pattern='\s+|[,.\"]')

# 刪掉停用詞
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                               outputCol='input_stop')
# 生成ngram詞對
ngram = ft.NGram(n=2,
                inputCol=stopwords.getOutputCol(),
                outputCol='nGrams')

# 構(gòu)建特征pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])

data_ngram = pipeline\
    .fit(text_data)\
    .transform(text_data)

data_ngram.show()

+--------------------+--------------------+--------------------+--------------------+
|               input|           input_arr|          input_stop|              nGrams|
+--------------------+--------------------+--------------------+--------------------+
|K-fold cross vali...|[k-fold, cross, v...|[k-fold, cross, v...|[k-fold cross, cr...|
|CrossValidatorMod...|[crossvalidatormo...|[crossvalidatormo...|[crossvalidatormo...|
|Creates a copy of...|[creates, a, copy...|[creates, copy, i...|[creates copy, co...|
+--------------------+--------------------+--------------------+--------------------+
3.1.2 離散連續(xù)變量

我們常常需要處理高度非線性連續(xù)特征,很難只用一個(gè)系數(shù)來供給模型。這種情況下,可能難以用一個(gè)系數(shù)來解釋這樣的特征與目標(biāo)之間的關(guān)系。有時(shí)候,將值劃分成分類級別是很有用的。

# 離散連續(xù)變量
x = np.arange(0, 100)
x = (x/100.0)*np.pi*4
y = x*np.sin(x/1.764)+20.1234

schema = typ.StructType([typ.StructField('continuous_var', typ.DoubleType(), nullable=False)])
data = spark.createDataFrame([[float(e)] for e in y], schema=schema)
data.show(4)
+------------------+
|    continuous_var|
+------------------+
|           20.1234|
|20.132344452369832|
|20.159087064491775|
|20.203356291885854|
+------------------+


# 使用QuantileDiscretizer模型將連續(xù)變量分為五個(gè)分類級別
discretizer = ft.QuantileDiscretizer(
    numBuckets=5, 
    inputCol='continuous_var',
    outputCol='discritized'
)

data_discretized = discretizer.fit(data).transform(data)
data_discretized.show(5,truncate=False)
+------------------+-----------+
|continuous_var    |discritized|
+------------------+-----------+
|20.1234           |2.0        |
|20.132344452369832|2.0        |
|20.159087064491775|2.0        |
|20.203356291885854|2.0        |
|20.26470185735763 |2.0        |
+------------------+-----------+
3.1.3 標(biāo)準(zhǔn)化連續(xù)變量

標(biāo)準(zhǔn)化連續(xù)變量不僅有助于更好地理解特征之間的關(guān)系,而且還有助于計(jì)算效率,并防止運(yùn)行到某些數(shù)字陷阱。

# 標(biāo)準(zhǔn)化連續(xù)變量
# 首先,要?jiǎng)?chuàng)建一個(gè)向量代表連續(xù)變量(因?yàn)樗皇且粋€(gè)float)
vectorizer = ft.VectorAssembler(inputCols=['continuous_var'],
                               outputCol='continuous_vec')

normlizer = ft.StandardScaler(inputCol=vectorizer.getOutputCol(),
                             outputCol='normlized',
                             withMean=True,
                             withStd=True)

pipeline = Pipeline(stages=[vectorizer, normlizer])
data_standardized = pipeline.fit(data).transform(data)
data_standardized.show(4)
+------------------+--------------------+--------------------+
|    continuous_var|      continuous_vec|           normlized|
+------------------+--------------------+--------------------+
|           20.1234|           [20.1234]|[0.23429139554502...|
|20.132344452369832|[20.132344452369832]|[0.23630959828688...|
|20.159087064491775|[20.159087064491775]|[0.24234373105179...|
|20.203356291885854|[20.203356291885854]|[0.25233252325644...|
+------------------+--------------------+--------------------+
3.2 聚類

在前面的例子中我們介紹了如何使用pyspark ml庫來擬合分類模型,在本節(jié)我們將簡單介紹pyspark ml庫中的聚類模型。

聚類是機(jī)器學(xué)習(xí)中的另一個(gè)重要組成部分:通常在現(xiàn)實(shí)世界中,我們沒有那么幸運(yùn)具有目標(biāo)特征,所以需要回到一個(gè)無監(jiān)督的學(xué)習(xí)范例,來試圖從中發(fā)掘數(shù)據(jù)內(nèi)的模式。

# 聚類
# 使用Kmeans模型在出生數(shù)據(jù)中查找相似性
import pyspark.ml.clustering as clus

kmeans = clus.KMeans(k=5, featuresCol='features')
pipeline = Pipeline(stages=[encoder, featuresCreator, kmeans])
model = pipeline.fit(birth_train)

test = model.transform(birth_test)
test.groupby('prediction')\
    .agg({'*':'count',
         'MOTHER_HEIGHT_IN':'avg'})\
    .collect()

[Row(prediction=1, avg(MOTHER_HEIGHT_IN)=67.43708609271523, count(1)=453),
 Row(prediction=3, avg(MOTHER_HEIGHT_IN)=67.65714285714286, count(1)=245),
 Row(prediction=4, avg(MOTHER_HEIGHT_IN)=63.92423385728825, count(1)=8843),
 Row(prediction=2, avg(MOTHER_HEIGHT_IN)=84.97315436241611, count(1)=447),
 Row(prediction=0, avg(MOTHER_HEIGHT_IN)=65.45034965034965, count(1)=3575)]
3.3 回歸

上面已經(jīng)介紹過了分類和聚類模型,最后我們再來簡單介紹一下回歸模型。
在本節(jié)中,我們將嘗試用給定的一些特征來預(yù)測MOTHER_WEIGHT_GAIN。

# 回歸
# 使用梯度提升決策樹來預(yù)測增加的體重
import pyspark.ml.regression as reg

features = ['MOTHER_AGE_YEARS', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT',
            'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST',
            'PREV_BIRTH_PRETERM', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI',
            'CIG_3_TRI']

featuresCreator = ft.VectorAssembler(
    inputCols=[col for col in features[1:]],
    outputCol='features'
)

# 這里使用卡方檢驗(yàn)選擇前六個(gè)最重要的特征
selector = ft.ChiSqSelector(numTopFeatures=6,
                            outputCol='selectedFeatures',
                            labelCol='MOTHER_WEIGHT_GAIN')

regressor = reg.GBTRegressor(maxIter=15,
                            maxDepth=3,
                            labelCol='MOTHER_WEIGHT_GAIN')

pipeline = Pipeline(stages=[featuresCreator, selector, regressor])
weight_gain = pipeline.fit(birth_train)

# 測試集評估
evaluator = ev.RegressionEvaluator(predictionCol='prediction',labelCol='MOTHER_WEIGHT_GAIN')
print(evaluator.evaluate(weight_gain.transform(birth_test),
                        {evaluator.metricName:'r2'}))

0.49363823556949404

雖然模型結(jié)果不太好,但是我們的重點(diǎn)不在這里,我們只是為了了解pyspark ml庫中回歸模型的用法。

參考:

《pyspark 實(shí)戰(zhàn)指南:利用python和spark構(gòu)建數(shù)據(jù)密集型應(yīng)用并規(guī)?;渴稹?/p>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容