做網(wǎng)站什么字體網(wǎng)絡(luò)營銷課程感悟
課程資源:(林子雨)Spark編程基礎(chǔ)(Python版)_嗶哩嗶哩_bilibili
第8章 Spark MLlib(6節(jié))
機(jī)器學(xué)習(xí)算法庫
(一)MLlib簡介
1、機(jī)器學(xué)習(xí)
機(jī)器學(xué)習(xí)可以看做是一門人工智能的科學(xué),該領(lǐng)域的主要研究對(duì)象是人工智能。機(jī)器學(xué)習(xí)利用數(shù)據(jù)或以往的經(jīng)驗(yàn),優(yōu)化計(jì)算機(jī)程序的性能標(biāo)準(zhǔn)。強(qiáng)調(diào)三個(gè)關(guān)鍵詞:算法、經(jīng)驗(yàn)、性能
- 模型:用數(shù)據(jù)對(duì)算法進(jìn)行訓(xùn)練后得到的
- 算法:區(qū)別于模型
(1)傳統(tǒng)機(jī)器學(xué)習(xí)算法
由于技術(shù)和單機(jī)存儲(chǔ)的限制,只能在少量數(shù)據(jù)上使用,依賴于數(shù)據(jù)抽樣
(2)Spark機(jī)器學(xué)習(xí)
大數(shù)據(jù)技術(shù)的出現(xiàn)可以支持在全量數(shù)據(jù)上進(jìn)行機(jī)器學(xué)習(xí)
- 使用MapReduce對(duì)機(jī)器學(xué)習(xí)算法進(jìn)行編寫:MapReduce是基于磁盤的計(jì)算框架,機(jī)器學(xué)習(xí)算法涉及大量迭代計(jì)算,涉及反復(fù)讀寫磁盤的開銷,有磁盤IO開銷比較大的缺陷
- 而Spark是基于內(nèi)存的計(jì)算框架,由于DAG機(jī)制避免頻繁讀寫磁盤開銷,適合大量迭代計(jì)算
2、MLlib-機(jī)器學(xué)習(xí)庫
(1)簡介?
提供了常用機(jī)器學(xué)習(xí)算法的分布式實(shí)現(xiàn)。且PySpark的即席查詢也是一個(gè)關(guān)鍵,算法工程師邊寫代碼、邊運(yùn)行、邊看結(jié)果
不是所有的機(jī)器學(xué)習(xí)算法都能用在Spark中,有的算法無法做成并行
- MLlib中只包含能夠在集群上運(yùn)行良好的并行算法,有些經(jīng)典的機(jī)器學(xué)習(xí)算法沒有包含在其中,因?yàn)樗鼈儾荒懿⑿袌?zhí)行
- 相反地,一些較新研究得出的算法因?yàn)檫m用于集群,也被包含在MLlib中,例如分布式隨機(jī)森林算法、最小交替二乘算法。這樣的選擇使得MLlib中的每一個(gè)算法都適用于大規(guī)模數(shù)據(jù)集
- 如果是小規(guī)模數(shù)據(jù)集上訓(xùn)練各機(jī)器學(xué)習(xí)模型,最好還是在各個(gè)節(jié)點(diǎn)上使用單節(jié)點(diǎn)的機(jī)器學(xué)習(xí)算法庫(比如Weka)
(2)內(nèi)容?
MLlib是Spark機(jī)器學(xué)習(xí)庫,旨在簡化機(jī)器學(xué)習(xí)的工程實(shí)踐工作
- 算法工具:分類、回歸、聚類、協(xié)同過濾
- 特征化工具:特征提取、轉(zhuǎn)換、降維、選擇
- 流水線(Pineline)工具:構(gòu)建+評(píng)估,調(diào)整機(jī)器學(xué)習(xí)工作流
- 持久性:保存、加載算法、模型、管道
- 實(shí)用性工具:線性代數(shù)、統(tǒng)計(jì)、數(shù)據(jù)處理
(3)與 spark.ml 的區(qū)別
是機(jī)器學(xué)習(xí)庫不同的包
- spark.mllib 是基于RDD的數(shù)據(jù)抽象,包含基于RDD的原始算法API。在1.0以前的版本已經(jīng)包含,提供的算法實(shí)現(xiàn)都是基于原始RDD
- spark.ml 是基于DataFrame的數(shù)據(jù)抽象,提供基于DataFrame高層次的API,可以用來構(gòu)建機(jī)器學(xué)習(xí)工作流Pipeline(與Spark SQL完美融合),彌補(bǔ)了原始mllib庫的不足,向用戶提供了一個(gè)基于DataFrame的機(jī)器學(xué)習(xí)工作流式API套件
(二)機(jī)器學(xué)習(xí)流水線
1、概念
(1)DataFrame:結(jié)構(gòu)化數(shù)據(jù)封裝
使用Spark SQL中的DataFrame作為數(shù)據(jù)集,可以容納各種數(shù)據(jù)類型。較之RDD,DataFrame包含了schema信息,更類似傳統(tǒng)數(shù)據(jù)庫中的二維表格。它被ML Pipeline用來存儲(chǔ)源數(shù)據(jù),例如,DataFrame中的列可以是存儲(chǔ)的文本、特征向量、真實(shí)標(biāo)簽和預(yù)測標(biāo)簽等
(2)轉(zhuǎn)換器:Transformer
將一個(gè)DataFrame轉(zhuǎn)換為另一個(gè)DataFrame。比如一個(gè)模型就是一個(gè)Transformer,它可以把一個(gè)不包含預(yù)測標(biāo)簽的測試數(shù)據(jù)集DataFrame打上標(biāo)簽,轉(zhuǎn)換成另一個(gè)包含預(yù)測標(biāo)簽的DataFrame。技術(shù)上,Transformer實(shí)現(xiàn)了一個(gè)方法 transform(),它通過附加一個(gè)或多個(gè)列,將一個(gè)DataFrame轉(zhuǎn)換為另一個(gè)DataFrame
(3)評(píng)估器(算法):Estimator
用數(shù)據(jù)對(duì)評(píng)估器訓(xùn)練得到模型,調(diào)用 .fit(DataFrame) 即可。它是學(xué)習(xí)算法或在訓(xùn)練數(shù)據(jù)上的訓(xùn)練方法的概念抽象,在Pipeline里通常是被用來操作DataFrame數(shù)據(jù)并生成一個(gè)Transformer。從技術(shù)上,Estimator實(shí)現(xiàn)了一個(gè)方法fit(),它接收一個(gè)DataFrame并產(chǎn)生一個(gè)轉(zhuǎn)換器。比如,一個(gè)隨機(jī)森林算法就是一個(gè)Estimator,它可以調(diào)用fit(),通過訓(xùn)練特征數(shù)據(jù)得到一個(gè)隨機(jī)森林模型
(4)參數(shù):Parameter
被用來設(shè)置Transformer或Estimator的參數(shù)。所有轉(zhuǎn)換器和評(píng)估器可共享用于指定參數(shù)的公共API。ParamMap是一組 (參數(shù), 值) 對(duì)?
(5)流水線/管道:PipeLine
將多個(gè)工作流階段(即轉(zhuǎn)換器和評(píng)估器)連接起來形成機(jī)器學(xué)習(xí)工作流并獲得輸出結(jié)果
2、構(gòu)建
- 定義Pipeline中的各個(gè)流水線階段PipelineStage(包含轉(zhuǎn)換器、評(píng)估器)
- 按照處理邏輯,轉(zhuǎn)換器和評(píng)估器有序地組織起來構(gòu)建成Pipeline
把訓(xùn)練數(shù)據(jù)集作為輸入?yún)?shù),調(diào)用fit()方法,返回一個(gè)PipelineModel類實(shí)例,輸出被用來預(yù)測測試數(shù)據(jù)的標(biāo)簽?
pipeline = Pipeline(stages = [stage1, stage2, stage3])
流水線各階段運(yùn)行,輸入的DataFrame在它通過每個(gè)階段時(shí)被轉(zhuǎn)換:
- Tokenizer:分詞
- HashingTF:把單詞轉(zhuǎn)換為特征向量
一個(gè)流水線,若一開始就包含了算法或評(píng)估器,那么它整體就是評(píng)估器,就可以調(diào)用 .fit() 對(duì)流水線進(jìn)行訓(xùn)練,得到流水線模型PipelineModel。即:流水線本身也可以看做是一個(gè)評(píng)估器,在流水線的fit()方法運(yùn)行之后,它產(chǎn)生一個(gè)PipelineModel,是一個(gè)Transformer,這個(gè)管道模型將在測試數(shù)據(jù)的時(shí)候使用
3、邏輯斯蒂回歸案例
任務(wù):查找所有包含Spark的句子,1即包含Spark,0即沒有包含Spark
- 使用SparkSession對(duì)象(Spark2.0以上版本,PySpark在啟動(dòng)時(shí)會(huì)自動(dòng)創(chuàng)建名為spark的SparkSession對(duì)象;但在編寫?yīng)毩⒋a時(shí)需自己生成)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WordCount").master("local").getOrCreate()
# SparkSession由其伴生對(duì)象的builder()方法創(chuàng)建
- pyspark.ml 依賴numpy包,Ubuntu自帶Python3是沒有numpy的,執(zhí)行命令安裝:sudo pip3 install numpy
(1)引入要包含的包并構(gòu)建訓(xùn)練數(shù)據(jù)集
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml import Pipelinetraining = spark.createDataFrame([(0, "a b c d e spark", 1.0),(1, "b d", 0.0),(2, "spark f g h", 1.0),(3, "hadoop mapreduce", 0.0)],["id", "text", "label"])
(2)定義Pipeline中各個(gè)流水線階段PipelineStage
每個(gè)階段是一個(gè)評(píng)估器或轉(zhuǎn)換器

tokenizer = Tokenizer(inputCol="text", outputCol="words") # 分詞器,words列是新生成的,會(huì)追加到DataFrame中
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
(3)按照處理邏輯有序地組織PipelineStage,創(chuàng)建Pipeline
pipline = Pipeline(stages=[tokenizer, hashingTF, lr]) # 現(xiàn)在構(gòu)建的Pipeline本質(zhì)上是一個(gè)estimator,在它的fit()方法運(yùn)行后,將產(chǎn)生一個(gè)PipelineModel,它是一個(gè)Transformer
model = pipline.fit(training) # model類型是一個(gè)PipelineModel,這個(gè)流水線模型將在測試數(shù)據(jù)的時(shí)候使用
(4)構(gòu)建測試數(shù)據(jù)
test = spark.createDataFrame([(4, "spark i j k"),(5, "l m n"),(6, "spark hadoop spark"),(7, "apache hadoop")], ["id", "text"] # 不包含label列)
(5)生成預(yù)測結(jié)果
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction") # probability屬于0/1的概率for row in selected.collect():rid, text, prob, prediction = row# %d代表int占位符,%s代表字符串占位符,%f代表浮點(diǎn)數(shù)占位符print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
(三)特征抽取:TF-IDF
1、TF-IDF(詞頻-逆向文件頻率)?
文本挖掘中使用的特征向量化方法,體現(xiàn)一個(gè)文檔中的詞語在語料庫中的重要程度
在Spark中,TF-IDF被分為兩個(gè)部分:
- TF(轉(zhuǎn)換器):HashingTF(哈希)。接收詞條的集合,把這些集合轉(zhuǎn)化成固定長度的特征向量,這個(gè)算法在哈希的同時(shí)會(huì)統(tǒng)計(jì)各個(gè)詞條的詞頻
- IDF(評(píng)估器):在一個(gè)數(shù)據(jù)集上應(yīng)用 fit() 方法,產(chǎn)生一個(gè)IDFModel。該IDFModel接收特征向量(由HashingTF產(chǎn)生),計(jì)算每一個(gè)詞在文檔中出現(xiàn)的頻次。IDF會(huì)減少那些在語料庫中出現(xiàn)頻率較高的詞的權(quán)重(因?yàn)檫@些詞的區(qū)分度低,不重要)
2、代碼?
從一組句子開始,首先使用分詞器Tokenizer把句子劃分為單個(gè)詞語,對(duì)每一個(gè)句子(詞袋)使用HashingTF將句子轉(zhuǎn)換為特征向量,最后使用IDF重新調(diào)整特征向量,以體現(xiàn)每個(gè)單詞真正的重要性
# 導(dǎo)入TF-IDF所需包
from pyspark.ml.feature import HashingTF, IDF, Tokenizer# 創(chuàng)建一個(gè)DataFrame,每一個(gè)句子代表一個(gè)文檔
sentenceData = spark.createDataFrame([(0, "I heard about Spark and I love Spark"),(0, "I wish Java could use case classes"),(1, "Logistic regression models are neat")]).toDF("label", "sentence")# 得到文檔集合后即可用tokenizer對(duì)句子進(jìn)行分詞
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData) # 轉(zhuǎn)換后新增一列words,為分詞結(jié)果
wordsData.show()+-----+--------------------+--------------------+
|label| sentence| words|
+-----+--------------------+--------------------+
| 0|I heard about Spa...|[i, heard, about,...|
| 0|I wish Java could...|[i, wish, java, c...|
| 1|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+
?每次轉(zhuǎn)換 .transform() 就會(huì)不斷增加新的列
# 使用HashingTF的transform()把句子哈希成特征向量
hashingTF = HashingTF(inputCol="words", outputCol="rawfeatures", numFeatures=2000) # 設(shè)置哈希表的桶數(shù)為2000
featurizedData = hashingTF.transform(wordsData)
featurizedData.select("words", "rawfeatures").show(truncate = False)+---------------------------------------------+---------------------------------------------------------------------+
|words |rawfeatures |
+---------------------------------------------+---------------------------------------------------------------------+
|[i, heard, about, spark, and, i, love, spark]|(2000,[240,673,891,956,1286,1756],[1.0,1.0,1.0,1.0,2.0,2.0]) |
|[i, wish, java, could, use, case, classes] |(2000,[80,342,495,1133,1307,1756,1967],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[logistic, regression, models, are, neat] |(2000,[286,763,1059,1604,1871],[1.0,1.0,1.0,1.0,1.0]) |
+---------------------------------------------+---------------------------------------------------------------------+
?(2000,[240,673,891,956,1286,1756],[1.0,1.0,1.0,1.0,2.0,2.0])
- 2000個(gè)哈希桶
- 240表示單詞i被扔到了第240個(gè)哈希桶
- 1.0表示對(duì)應(yīng)單詞的出現(xiàn)次數(shù)
# 使用IDF評(píng)估器來對(duì)單純的詞頻特征向量進(jìn)行構(gòu)造
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(featurizedData) # 對(duì)評(píng)估器進(jìn)行訓(xùn)練# 調(diào)用IDFModel的transform()方法調(diào)權(quán)重
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "label").show(truncate = False)
(四)特征轉(zhuǎn)換:Word2Vec(標(biāo)簽和索引的轉(zhuǎn)換)
在機(jī)器學(xué)習(xí)處理過程中,為了方便相關(guān)算法的實(shí)現(xiàn),經(jīng)常需要把標(biāo)簽數(shù)據(jù)(一般是字符串)轉(zhuǎn)換為整數(shù)索引,或是在計(jì)算結(jié)束后將整數(shù)索引還原為相應(yīng)的標(biāo)簽
Spark ML包提供了幾個(gè)相關(guān)的轉(zhuǎn)換器,如:StringIndexer、IndexToString、OneHotEncoder、VectorIndexer,它們提供了十分方便的特征轉(zhuǎn)換功能,這些轉(zhuǎn)換器類都位于org.apache.spark.ml.feature包下
用于特征轉(zhuǎn)換的轉(zhuǎn)換器和其他機(jī)器學(xué)習(xí)算法一樣,也屬于ML Pipeline模型的一部分,可以用來構(gòu)建機(jī)器學(xué)習(xí)流水線。以StringIndexer為例,其存儲(chǔ)著進(jìn)行標(biāo)簽數(shù)值化過程的相關(guān)超參數(shù),是一個(gè)Estimator,對(duì)其調(diào)用fit()方法即可生成相應(yīng)的模型StringIndexerModel類。很顯然,它存儲(chǔ)了用于DataFrame進(jìn)行相關(guān)處理的參數(shù),是一個(gè)Transformer(其他轉(zhuǎn)換器也是同一原理)
1、StringIndexer
可以把一列類別型特征(或標(biāo)簽)進(jìn)行編碼,使其數(shù)值化。索引的范圍從0開始,該過程可以使相應(yīng)的特征索引化,使得某些無法接受類別型特征的算法可以使用,并提高諸如決策樹等機(jī)器學(xué)習(xí)算法的效率
- 索引構(gòu)建的順序?yàn)闃?biāo)簽的頻率,優(yōu)先編碼頻率較大的標(biāo)簽,所以出現(xiàn)頻率最高的標(biāo)簽為0號(hào)
- 如果輸入數(shù)值型的,會(huì)先把它轉(zhuǎn)化成字符型,再對(duì)其進(jìn)行編碼
from pyspark.ml.feature import StringIndexer# 構(gòu)建DataFrame,設(shè)置StringIndexer的輸入列和輸出列
df = spark.createDataFrame([(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],["id", "category"])# 構(gòu)建轉(zhuǎn)換器,字符串類型轉(zhuǎn)為整型
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")# 通過fit()進(jìn)行模型訓(xùn)練,用訓(xùn)練出的模型對(duì)原數(shù)據(jù)集進(jìn)行處理
model = indexer.fit(df)
indexed = model.transform(df)
indexed.show() # 頻率最高的會(huì)被轉(zhuǎn)化為0+---+--------+--------------+
| id|category| categoryIndex|
+---+--------+--------------+
| 0| a| 0.0|
| 1| b| 2.0|
| 2| c| 1.0|
| 3| a| 0.0|
| 4| a| 0.0|
| 5| c| 1.0|
+---+--------+--------------+
a出現(xiàn)3次,故為0.0
c出現(xiàn)2次,故為1.0
b出現(xiàn)1次,故為2.0
2、IndexToString(與 StringIndexer 相反)
把標(biāo)簽索引的一列重新映射回原有的字符型標(biāo)簽。其主要使用場景一般都是和 StringIndexer 配合,先用StringIndexer將標(biāo)簽轉(zhuǎn)換成標(biāo)簽索引,進(jìn)行模型訓(xùn)練,然后在預(yù)測標(biāo)簽時(shí)再把標(biāo)簽索引轉(zhuǎn)換成原有的字符標(biāo)簽
from pyspark.ml.feature import IndexToString, StringIndexertoString = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
indexString = toString.transform(indexed)
indexString.select("id", "originalCategory").show()+---+----------------+
| id|originalCategory|
+---+----------------+
| 0| a|
| 1| b|
| 2| c|
| 3| a|
| 4| a|
| 5| c|
+---+----------------+
3、VectorIndexer
之前介紹的 StringIndexer 是針對(duì)單個(gè)類別型特征進(jìn)行轉(zhuǎn)換。倘若所有特征都已經(jīng)被組織在一個(gè)向量中,又想對(duì)其中某些單個(gè)分量進(jìn)行處理時(shí),Spark ML提供了 VectorIndexer類 來解決向量數(shù)據(jù)集中的類別型特征轉(zhuǎn)換。通過為其提供 maxCategories 超參數(shù),它可以自動(dòng)識(shí)別哪些特征是類別型并將原始值轉(zhuǎn)換為類別索引。它基于不同特征值的數(shù)量來識(shí)別哪些特征需要被類別化,那些取值可能性最多不超過?maxCategories 的特征會(huì)被認(rèn)為是類別型
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.linalg import Vector, Vectors# 每一個(gè)vector是一個(gè)樣本的特征向量,縱向編碼
df = spark.createDataFrame([(Vectors.dense(-1.0, 1.0, 1.0),), (Vectors.dense(-1.0, 3.0, 1.0),), (Vectors.dense(0.0, 5.0, 1.0), )], ["features"])# 構(gòu)建VectorIndexer轉(zhuǎn)換器,設(shè)置輸入輸出列,并進(jìn)行模型訓(xùn)練
indexer = VectorIndexer(maxCategories=2, inputCol="features", outputCol="indexed") # maxCategories表示超過此值后,不進(jìn)行類別編碼
indexerModel = indexer.fit(df)# 通過categoryMaps成員來獲得被轉(zhuǎn)換的特征及其映射
categoricalFeatures = indexerModel.categoryMaps.keys()
print("Choose" + str(len(categoricalFeatures)) + "categorical features:" + str(categoricalFeatures)) # Choose 2 categorical features:[0,2]# 把模型應(yīng)用于原有數(shù)據(jù),并打印結(jié)果
indexed = indexerModel.transform(df)
indexed.show()+--------------+-------------+
| features| indexed|
+--------------+-------------+
|[-1.0,1.0,1.0]|[1.0,1.0,0.0]|
|[-1.0,3.0,1.0]|[1.0,3.0,0.0]|
| [0.0,5.0,1.0]|[0.0,5.0,0.0]|
+--------------+-------------+# 第一列 [-1.0,-1.0,0.0] 不同值個(gè)數(shù)為2個(gè)=2,類別型特征,轉(zhuǎn)換
# 第二列 [1.0,3.0,5.0] 不同值個(gè)數(shù)為3個(gè)>2,不轉(zhuǎn)換
# 第三列 [1.0,1.0,1.0] 不同值個(gè)數(shù)為1個(gè)<2,類別型特征,轉(zhuǎn)換
(五)邏輯斯蒂回歸分類器
邏輯斯蒂回歸(Logistic Regression)是統(tǒng)計(jì)學(xué)習(xí)中的經(jīng)典分類方法,屬于對(duì)數(shù)線性模型。logistic回歸的因變量可以是二分類的,也可以是多分類的
1、iris數(shù)據(jù)集介紹
https://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/03/iris.txt
iris 以鳶尾花的特征作為數(shù)據(jù)來源,數(shù)據(jù)集包含150個(gè)數(shù)據(jù),分為3類,每類50個(gè)數(shù)據(jù),每個(gè)數(shù)據(jù)包含4個(gè)屬性,是在數(shù)據(jù)挖掘、數(shù)據(jù)分類中常用的訓(xùn)練集測試集
2、iris數(shù)據(jù)集分類實(shí)例
(1)導(dǎo)入需要的包?
# 1 導(dǎo)入需要的包
from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql import Row, functions
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, BinaryLogisticRegressionSummary, LogisticRegression
(2)定制函數(shù)返回?cái)?shù)據(jù)字典
讀取文本文件,第一個(gè)map把每行數(shù)據(jù)用逗號(hào)隔開。每行被分成5個(gè)部分,前4部分是鳶尾花的4個(gè)特征,最后一部分是鳶尾花的類別。把特征存儲(chǔ)在Vector中,創(chuàng)建一個(gè)iris模式的RDD,然后轉(zhuǎn)換為DataFrame
# 2 定制一個(gè)函數(shù),來返回一個(gè)指定的數(shù)據(jù)字典
def f(x): # 傳入x為一個(gè)列表(4個(gè)特征+分類label)rel = {}rel['features'] = Vectors.dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))rel['label'] = str(x[4])return rel # 兩個(gè)鍵值對(duì)data = spark.sparkContext. \textFile("file:///usr/local/spark/iris.txt"). \map(lambda line: line.split(',')). \map(lambda p: Row(**f(p))). \ # 根據(jù)數(shù)據(jù)字典封裝成Row對(duì)象toDF()
data.show()+-----------------+-----------+
| features| label|
+-----------------+-----------+
|[5.1,3.5,1.4,0.2]|Iris-setosa|
|[4.9,3.0,1.4,0.2]|Iris-setosa|
|[4.7,3.2,1.3,0.2]|Iris-setosa|
|[4.6,3.1,1.5,0.2]|Iris-setosa|
|[5.0,3.6,1.4,0.2]|Iris-setosa|
(3)分別獲取標(biāo)簽列和特征列?
# 3 分別獲取標(biāo)簽列和特征列,進(jìn)行索引并進(jìn)行重命名
labelIndexer = StringIndexer(). \ # 把字符串標(biāo)簽轉(zhuǎn)換為數(shù)值型索引setInputCol("label"). \setOutputCol("indexedLabel"). \fit(data) # 評(píng)估器->轉(zhuǎn)換器featureIndexer = VectorIndexer(). \ # 把數(shù)值型特征向量轉(zhuǎn)換為索引數(shù)值型特征向量setInputCol("features"). \setOutputCol("indexedFeatures"). \fit(data) # 評(píng)估器->轉(zhuǎn)換器
(4)設(shè)置LogisticRegression算法的參數(shù)
# 具體可以設(shè)置的參數(shù),可以通過explainParams()來獲取,還能看到程序已經(jīng)設(shè)置的參數(shù)的結(jié)果
lr = LogisticRegression(). \setLabelCol("indexedLabel"). \setFeaturesCol("indexedFeatures"). \setMaxIter(100). \ # 循環(huán)次數(shù)為100次setRegParam(0.3). \ # 規(guī)范化項(xiàng)為0.3setElasticNetParam(0.8)
print("LogisticRegression parameters:\n" + lr.explainParams())
(5)設(shè)置一個(gè)IndexToString的轉(zhuǎn)換器?
構(gòu)建一個(gè)機(jī)器學(xué)習(xí)流水線,設(shè)置各個(gè)階段。上一個(gè)階段的輸出將是本階段的輸入??
# 5 把預(yù)測的類別(數(shù)值型prediction) 轉(zhuǎn)化成字符型的predictedLabel
labelConverter = IndexToString(). \setInputCol("prediction"). \ # 預(yù)測得到的分類setOutputCol("predictedLabel"). \setLabels(labelIndexer.labels) # 標(biāo)簽來源# 6 構(gòu)建機(jī)器學(xué)習(xí)流水線(Pipeline)
lrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
(6)訓(xùn)練+預(yù)測?
Pipeline本質(zhì)上是一個(gè)評(píng)估器,當(dāng)Pipeline調(diào)用fit()的時(shí)候就產(chǎn)生了一個(gè)PipelineModel,它是一個(gè)轉(zhuǎn)換器。然后,這個(gè)PipelineModel就可以調(diào)用transform()來進(jìn)行預(yù)測,生成一個(gè)新的DataFrame,即利用訓(xùn)練得到的模型對(duì)測試集進(jìn)行驗(yàn)證
# 把數(shù)據(jù)集隨機(jī)分成訓(xùn)練集和測試集,其中訓(xùn)練集占70%
trainingData, testData = data.randomSplit([0.7, 0.3])
lrPipelineModel = lrPipeline.fit(trainingData)
lrPredictions = lrPipelineModel.transform(testData) # testData只包含4個(gè)特征,不包含label
(7)輸出預(yù)測的結(jié)果?
# 7 select選擇要輸出的列
# collect獲取所有行的數(shù)據(jù)
# 用foreach把每行打印出來
preRows = lrPredictions.select("label", "features", "probability", "predictedLabel").collect()
for row in preRows:label, features, probability, predictedLabel = rowprint("%s,%s --> prob=%s,predictedLabel:%s" % (label, features, probability, predictedLabel))Iris-setosa,[4.3,3.0,1.1,0.1] --> prob=[0.5243322260103365,0.2807261844423659,0.1949415895472976],predictedLabel:Iris-setosa
Iris-setosa,[4.4,2.9,1.4,0.2] --> prob=[0.49729174541655624,0.2912406744481094,0.2114675801353344],predictedLabel:Iris-setosa
Iris-setosa,[4.4,3.2,1.3,0.2] --> prob=[0.5033392716254922,0.28773708047332464,0.20892364790118315],predictedLabel:Iris-setosa
Iris-setosa,[4.6,3.2,1.4,0.2] --> prob=[0.49729174541655624,0.2912406744481094,0.2114675801353344],predictedLabel:Iris-setosa
(8)對(duì)訓(xùn)練的模型進(jìn)行評(píng)估?
用set方法把預(yù)測分類的列名和真實(shí)分類的列名進(jìn)行設(shè)置,然后計(jì)算預(yù)測準(zhǔn)確率?
# 8 創(chuàng)建一個(gè)MulticlassClassificationEvaluator實(shí)例
evaluator = MulticlassClassificationEvaluator(). \setLabelCol("indexedLabel"). \ # 真實(shí)字符串標(biāo)簽被轉(zhuǎn)換為數(shù)值型標(biāo)簽的結(jié)果setPredictionCol("prediction")
lrAccuracy = evaluator.evaluate(lrPredictions)
print("lrAccuracy=%f" % lrAccuracy) # 0.7774712643678161
(9)通過model來獲取訓(xùn)練得到的邏輯斯蒂模型?
# 9 lrPipelineModel是一個(gè)PipelineModel,因此可以通過調(diào)用它的stages方法來獲取lr模型
lrModel = lrPipelineModel.stages[2] # .stages是一個(gè)列表,lr是封裝在機(jī)器學(xué)習(xí)流水線里
print("\nCoefficients: \n " + str(lrModel.coefficientMatrix) +"\nIntercept: " + str(lrModel.interceptVector) +"\n numClasses: " + str(lrModel.numClasses) +"\n numFeatures: " + str(lrModel.numFeatures)Coefficients: 3 X 4 CSRMatrix
(0,2) -0.2419
(0,3) -0.1715
(1,3) 0.446
Intercept: [0.7417523479805953,-0.16623552721353418,-0.575516820767061]numClasses: 3numFeatures: 4
(六)決策樹分類器
決策樹是一種基本的分類和回歸方法,這里主要介紹分類。
1、決策樹
決策樹模型呈樹型結(jié)構(gòu),其中每個(gè)內(nèi)部節(jié)點(diǎn)表示一個(gè)屬性上的測試,每個(gè)分支代表一個(gè)測試輸出,每個(gè)葉節(jié)點(diǎn)代表一種類別。學(xué)習(xí)時(shí)利用訓(xùn)練數(shù)據(jù),根據(jù)損失函數(shù)最小化的原則建立決策樹模型;預(yù)測時(shí)對(duì)新的數(shù)據(jù)利用決策樹模型進(jìn)行分類?
決策樹學(xué)習(xí)步驟:特征選擇 - 決策樹生成 - 決策樹剪枝
2、iris數(shù)據(jù)集分類實(shí)例
(1)導(dǎo)入需要的包
from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql import Row
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassificationModel, DecisionTreeClassifier
(2)讀取文本文件
def f(x): # 傳入x為一個(gè)列表(4個(gè)特征+分類label)rel = {}rel['features'] = Vectors.dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))rel['label'] = str(x[4])return rel # 兩個(gè)鍵值對(duì)data = spark.sparkContext. \textFile("file:///usr/local/spark/iris.txt"). \map(lambda line: line.split(',')). \ # 把每行的數(shù)據(jù)用逗號(hào)隔開map(lambda p: Row(**f(p))). \ # 根據(jù)數(shù)據(jù)字典封裝成Row對(duì)象toDF()
(3)處理特征和標(biāo)簽,以及數(shù)據(jù)分組?
# 3 分別獲取標(biāo)簽列和特征列,進(jìn)行索引并進(jìn)行重命名
labelIndexer = StringIndexer(). \ # 把字符串標(biāo)簽轉(zhuǎn)換為數(shù)值型索引setInputCol("label"). \setOutputCol("indexedLabel"). \fit(data) # 評(píng)估器->轉(zhuǎn)換器featureIndexer = VectorIndexer(). \ # 把原始特征向量轉(zhuǎn)換為索引值特征向量setInputCol("features"). \setOutputCol("indexedFeatures"). \setMaxCategories(4). \ # 不同數(shù)值個(gè)數(shù)≤4才轉(zhuǎn)換成數(shù)值型標(biāo)簽fit(data) # 評(píng)估器->轉(zhuǎn)換器labelConverter = IndexToString(). \setInputCol("prediction"). \ # 預(yù)測得到的分類(數(shù)值型分類標(biāo)簽)setOutputCol("predictedLabel"). \ # 轉(zhuǎn)換為字符串類型標(biāo)簽列setLabels(labelIndexer.labels) # 原來的字符串類型標(biāo)簽來源trainingData, testData = data.randomSplit([0.7, 0.3])
(4)構(gòu)建決策樹分類模型,設(shè)置決策樹的參數(shù)
通過set的方法來設(shè)置決策樹的參數(shù),也可以用ParamMap來設(shè)置。這里僅需設(shè)置特征列(FeaturesCol)和待預(yù)測列(LabelCol)。具體可以設(shè)置的參數(shù)可以通過 explainParams() 獲取?
dtClassifier = DecisionTreeClassifier(). \setLabelCol("indexedLabel"). \setFeaturesCol("indexedFeatures")
(5)構(gòu)建機(jī)器學(xué)習(xí)流水線Pipeline,調(diào)用fit()進(jìn)行模型訓(xùn)練
對(duì)評(píng)估器訓(xùn)練后得到模型,即轉(zhuǎn)換器,即可對(duì)測試數(shù)據(jù)進(jìn)行轉(zhuǎn)換,得到預(yù)測結(jié)果
dtPipeline = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
dtPipelineModel = dtPipeline.fit(trainingData)
dtPredictions = dtPipelineModel.transform(testData)
dtPredictions.select("predictedLabel", "label", "features").show(20)
模型的預(yù)測準(zhǔn)確率:?
evaluator = MulticlassClassificationEvaluator(). \setLabelCol("indexedLabel"). \ # 真實(shí)字符串標(biāo)簽被轉(zhuǎn)換為數(shù)值型標(biāo)簽的結(jié)果setPredictionCol("prediction")
dtAccuracy = evaluator.evaluate(dtPredictions)
print("dtAccuracy=%f" % dtAccuracy) # 0.9726976552103888
(6)調(diào)用toDebugString方法查看訓(xùn)練的決策樹模型結(jié)構(gòu)
treeModelClassifier = dtPipelineModel.stages[2] # .stages是一個(gè)列表,dt是封裝在機(jī)器學(xué)習(xí)流水線里
print("Learned classification tree model:\n" + str(treeModelClassifier.toDebugString))