用手機什么軟件做網(wǎng)站百度推廣怎么操作流程
文章目錄
- Spark三種任務(wù)提交模式
- 寬依賴和窄依賴
- Stage
- Spark Job的三種提交模式
- Shuffle機制分析
- 未優(yōu)化的Hash Based Shuffle
- 優(yōu)化后的Hash Based Shuffle
- Sort-Based Shuffle
- Spark之checkpoint
- checkpoint概述
- checkpoint與持久化的區(qū)別
- checkPoint的使用
- checkpoint源碼分析
- Spark程序性能優(yōu)化
- 性能優(yōu)化分析
- 內(nèi)存都去哪了
- 性能優(yōu)化方案
- 高性能序列化類庫
- 持久化或者checkpoint
- JVM垃圾回收調(diào)優(yōu)
- 提高并行度
- 數(shù)據(jù)本地化
- Spark性能優(yōu)化之算子優(yōu)化
- map vs mapPartitions
- foreach vs foreachPartition
- repartition的使用
- reduceByKey和groupByKey的區(qū)別
Spark三種任務(wù)提交模式
寬依賴和窄依賴
- 窄依賴(Narrow Dependency):指父RDD的每個分區(qū)只被子RDD的一個分區(qū)所使用,例如map、filter等這些算子。一個RDD,對它的父RDD只有簡單的一對一的關(guān)系,也就是說,RDD的每個partition僅僅依賴于父RDD中的一個partition,父RDD和子RDD的partition之間的對應(yīng)關(guān)系,是一對一的。
- 寬依賴(Shuffle Dependency):父RDD的每個分區(qū)都可能被子RDD的多個分區(qū)使用,例如groupByKey、reduceByKey,sortBykey等算子,這些算子其實都會產(chǎn)生shuffle操作。也就是說,每一個父RDD的partition中的數(shù)據(jù)都可能會傳輸一部分到下一個RDD的每個partition中。此時就會出現(xiàn),父RDD和子RDD的partition之間,具有錯綜復(fù)雜的關(guān)系,那么,這種情況就叫做兩個RDD之間是寬依賴,同時,他們之間會發(fā)生shuffle操作。
下面來看圖具體分析一個案例,以單詞計數(shù)案例來分析
最左側(cè)是linesRDD,這個表示我們通過textFile讀取文件中的數(shù)據(jù)之后獲取的RDD。
接著是我們使用flatMap算子,對每一行數(shù)據(jù)按照空格切開,然后可以獲取到第二個RDD,這個RDD中包含的是切開的每一個單詞。在這里這兩個RDD就屬于一個窄依賴,因為父RDD的每個分區(qū)只被子RDD的一個分區(qū)所使用,也就是說他們的分區(qū)是一對一的,這樣就不需要經(jīng)過shuffle了。接著是使用map算子,將每一個單詞轉(zhuǎn)換成(單詞,1)這種形式,此時這兩個RDD也是一個窄依賴的關(guān)系,父RDD的分區(qū)和子RDD的分區(qū)也是一對一的。
最后我們會調(diào)用reduceByKey算子,此時會對相同key的數(shù)據(jù)進行分組,分到一個分區(qū)里面,并且進行聚合操作,此時父RDD的每個分區(qū)都可能被子RDD的多個分區(qū)使用,那這兩個RDD就屬于寬依賴了。
Stage
spark job是根據(jù)action算子觸發(fā)的,遇到action算子就會起一個job
Spark Job會被劃分為多個Stage,每一個Stage是由一組并行的Task組
注意:stage的劃分依據(jù)就是看是否產(chǎn)生了shuflle(即寬依賴),遇到一個shuffle操作就劃分為前后兩個stage
stage是由一組并行的task組成,stage會將一批task用TaskSet來封裝,提交給TaskScheduler進行分配,最后發(fā)送到Executor執(zhí)行
注意:Stage的劃分規(guī)則:從后往前,遇到寬依賴就劃分Stage
為什么是從后往前呢?因為RDD之間是有血緣關(guān)系的,后面的RDD依賴前面的RDD,也就是說后面的RDD要等前面的RDD執(zhí)行完,才會執(zhí)行。所以從后往前遇到寬依賴就劃分為兩個stage,shuffle前一個,shuffle后一個。如果整個過程沒有產(chǎn)生shuffle那就只會有一個stage。
看這個圖
RDD G 往前推,到RDD B的時候,是窄依賴,所以不切分Stage,再往前到RDD A,此時產(chǎn)生了寬依賴,所以RDD A屬于一個Stage、RDD B 和 G屬于一個Stage。再看下面,RDD G到RDD F,產(chǎn)生了寬依賴,所以RDD F屬于一個Stage,因為RDD F和 RDD C、D、E這幾個RDD沒有產(chǎn)生寬依賴,都是窄依賴,所以他們屬于一個Stage。所以這個圖中,RDD A 單獨一個stage1,RDD C、D、E、F被劃分在stage2中,最后RDD B和RDD G劃分在了stage3 里面.
注意:Stage劃分是從后往前劃分,但是stage執(zhí)行時從前往后的,這就是為什么后面先切割的stage為什么編號是3.
Spark Job的三種提交模式
- 第一種,standalone模式,基于Spark自己的standalone集群。
指定–master spark://bigdata01:7077 - 第二種,是基于YARN的client模式。指定–master yarn --deploy-mode client
這種方式主要用于測試,查看日志方便一些,部分日志會直接打印到控制臺上面,因為driver進程運行在本地客戶端,就是提交Spark任務(wù)的那個客戶端機器,driver負責(zé)調(diào)度job,會與yarn集群產(chǎn)生大量的通信,一般情況下Spark客戶端機器和Hadoop集群的機器是無法內(nèi)網(wǎng)通信,只能通過外網(wǎng),這樣在大量通信的情況下會影響通信效率,并且當(dāng)我們執(zhí)行一些action操作的時候數(shù)據(jù)也會返回給driver端,driver端機器的配置一般都不高,可能會導(dǎo)致內(nèi)存溢出等問題。 - 第三種,是基于YARN的cluster模式。【推薦】指定–master yarn --deploy-mode cluster
這種方式driver進程運行在集群中的某一臺機器上,這樣集群內(nèi)部節(jié)點之間通信是可以通過內(nèi)網(wǎng)通信的,并且集群內(nèi)的機器的配置也會比普通的客戶端機器配置高,所以就不存在yarn-client模式的一些問題了,只不過這個時候查看日志只能到集群上面看了,這倒沒什么影響。
左邊是standalone模式,現(xiàn)在我們使用的提交方式,driver進程是在客戶端機器中的,其實針對standalone模式而言,這個Driver進程也是可以運行在集群中的。
來看一下官網(wǎng)文檔,standalone模式也是支持的,通過指定deploy-mode 為cluster即可
中間的值yarn client模式,由于是on yarn模式,所以里面是yarn集群的進程,此時driver進程就在提交spark任務(wù)的客戶端機器上了。最右邊這個是yarn cluster模式,driver進程就會在集群中的某一個節(jié)點上面。
Shuffle機制分析
在MapReduce框架中,Shuffle是連接Map和Reduce之間的橋梁,Map階段通過shuffle讀取數(shù)據(jù)并輸出到對應(yīng)的Reduce;而Reduce階段負責(zé)從Map端拉取數(shù)據(jù)并進行計算。在整個shuffle過程中,往往伴隨著大量的磁盤和網(wǎng)絡(luò)I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低。Spark也會有自己的shuffle實現(xiàn)過程。
我們首先來看一下
在Spark中,什么情況下,會發(fā)生shuffle?
reduceByKey、groupByKey、sortByKey、countByKey、join等操作都會產(chǎn)生shuffle。那下面我們來詳細分析一下Spark中的shuffle過程。Spark的shuffle歷經(jīng)了幾個過程
- Spark 0.8及以前 使用Hash Based Shuffle
- Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制
- Spark1.6之后使用Sort-Base Shuffle,因為Hash Based Shuffle存在一些不足所以就把它替換掉了。
所以Spark Shuffle 一共經(jīng)歷了這幾個過程:
- 未優(yōu)化的 Hash Based Shuffle
- 優(yōu)化后的Hash Based Shuffle
- Sort-Based Shuffle
未優(yōu)化的Hash Based Shuffle
來看一個圖,假設(shè)我們是在執(zhí)行一個reduceByKey之類的操作,此時就會產(chǎn)生shuffle。
shuffle里面會有兩種task,一種是shuffleMapTask,負責(zé)拉取前一個RDD中的數(shù)據(jù),還有一個ResultTask,負責(zé)把拉取到的數(shù)據(jù)按照規(guī)則匯總起來
1:假設(shè)有1個節(jié)點,這個節(jié)點上有2個CPU,上面運行了4個ShuffleMapTask,這樣的話其實同時只有2個ShuffleMapTask是并行執(zhí)行的,因為一個cpu core同時只能執(zhí)行一個ShuffleMapTask。
2:每個ShuffleMapTask都會為每個ResultTask創(chuàng)建一份Bucket緩存,以及對應(yīng)的ShuffleBlockFile磁盤文件這樣的話,每一個ShuffleMapTask都會產(chǎn)生4份Bucket緩存和對應(yīng)的4個ShuffleBlockFile文件,分別對應(yīng)下面的4個ResultTask
3:假設(shè)另一個節(jié)點上面運行了4個ResultTask現(xiàn)在等著獲取ShuffleMapTask的輸出數(shù)據(jù),來完成比如ReduceByKey的操作。
這是這個流程,注意了,如果有100個MapTask,100個ResultTask,那么會產(chǎn)生10000個本地磁盤文件,這樣需要頻繁的磁盤IO,是比較影響性能的。
注意,那個bucket緩存是非常重要的,ShuffleMapTask會把所有的數(shù)據(jù)都寫入Bucket緩存之后,才會刷寫到對應(yīng)的磁盤文件中,但是這就有一個問題,如果map 端數(shù)據(jù)過多,那么很容易造成內(nèi)存溢出,所以spark在優(yōu)化后的Hash Based Shuffle中對這個問題進行了優(yōu)化,默認這個內(nèi)存緩存是100kb,當(dāng)Bucket中的數(shù)據(jù)達到了閾值之后,就會將數(shù)據(jù)一點一點地刷寫到對應(yīng)的ShuffleBlockFile磁盤中了。這種操作的優(yōu)點,是不容易發(fā)生內(nèi)存溢出。缺點在于,如果內(nèi)存緩存過小的話,那么可能發(fā)生過多的磁盤io操作。所以,這里的內(nèi)存緩存大小,是可以根據(jù)實際的業(yè)務(wù)情況進行優(yōu)化的。
優(yōu)化后的Hash Based Shuffle
看這個優(yōu)化后的shuffle流程
1:假設(shè)機器上有2個cpu,4個shuffleMaptask,這樣同時只有2個在并行執(zhí)行
2:在這個版本中,Spark引入了consolidation機制,一個ShuffleMapTask將數(shù)據(jù)寫入ResultTask數(shù)量的本地文件中,這個是不變的,但是當(dāng)下一個ShuffleMapTask運行的時候,可以直接將數(shù)據(jù)寫入之前產(chǎn)生的本地文件中,相當(dāng)于對多個ShuffleMapTask的輸出進行了合并,從而大大減少了本地磁盤中文件的數(shù)量。
此時文件的數(shù)量變成了CPU core數(shù)量 * ResultTask數(shù)量,比如每個節(jié)點上有2個CPU,有100個ResultTask,那么每個節(jié)點上會產(chǎn)生200個文件。這個時候文件數(shù)量就變得少多了。但是如果 ResultTask端的并行任務(wù)過多的話則 CPU core * Result Task 依舊過大,也會產(chǎn)生很多小文件
Sort-Based Shuffle
引入 Consolidation 機制雖然在一定程度上減少了磁盤文件數(shù)量,但是不足以有效提高 Shuffle 的性能,這種情況只適合中小型數(shù)據(jù)規(guī)模的數(shù)據(jù)處理。為了讓 Spark 能在更大規(guī)模的集群上高性能處理大規(guī)模的數(shù)據(jù),因此 Spark 引入了 Sort-Based Shuffle。
該機制針對每一個 ShuffleMapTask 都只創(chuàng)建一個文件,將所有的 ShuffleMapTask 的數(shù)據(jù)都寫入同一個文件,并且對應(yīng)生成一個索引文件。
以前的數(shù)據(jù)是放在內(nèi)存中,等到數(shù)據(jù)寫完了再刷寫到磁盤,現(xiàn)在為了減少內(nèi)存的使用,在內(nèi)存不夠用的時候,可以將內(nèi)存中的數(shù)據(jù)溢寫到磁盤,結(jié)束的時候,再將這些溢寫的文件聯(lián)合內(nèi)存中的數(shù)據(jù)一起進行歸并,從而減少內(nèi)存的使用量。一方面文件數(shù)量顯著減少,另一方面減少緩存所占用的內(nèi)存大小,而且同時避免 GC 的風(fēng)險和頻率。
Spark之checkpoint
checkpoint概述
checkpoint是Spark提供的一個比較高級的功能。有時候,我們的Spark任務(wù),比較復(fù)雜,從初始化RDD開始,到最后整個任務(wù)完成,有比較多的步驟,比如超過10個transformation算子。而且,整個任務(wù)運行的時間也特別長,比如通常要運行1~2個小時。在這種情況下,就比較適合使用checkpoint功能了。
因為對于特別復(fù)雜的Spark任務(wù),有很高的風(fēng)險會出現(xiàn)某個要反復(fù)使用的RDD因為節(jié)點的故障導(dǎo)致丟失,雖然之前持久化過,但是還是導(dǎo)致數(shù)據(jù)丟失了。那么也就是說,出現(xiàn)失敗的時候,沒有容錯機制,所以當(dāng)后面的transformation算子,又要使用到該RDD時,就會發(fā)現(xiàn)數(shù)據(jù)丟失了,此時如果沒有進行容錯處理的話,那么就需要再重新計算一次數(shù)據(jù)了。所以針對這種Spark Job,如果我們擔(dān)心某些關(guān)鍵的,在后面會反復(fù)使用的RDD,因為節(jié)點故障導(dǎo)致數(shù)據(jù)丟失,那么可以針對該RDD啟動checkpoint機制,實現(xiàn)容錯和高可用
那如何使用checkPoint呢?
首先要調(diào)用SparkContext的setCheckpointDir()方法,設(shè)置一個容錯的文件系統(tǒng)的目錄,比如HDFS;然后,對RDD調(diào)用checkpoint()方法。最后,在RDD所在的job運行結(jié)束之后,會啟動一個單獨的job,將checkpoint設(shè)置過的RDD的數(shù)據(jù)寫入之前設(shè)置的文件系統(tǒng)中。這是checkpoint使用的基本步驟,很簡單,那我們下面先從理論層面分析一下當(dāng)我們設(shè)置好checkpoint之后,Spark底層都做了哪些事情
1:SparkContext設(shè)置checkpoint目錄,用于存放checkpoint的數(shù)據(jù);對RDD調(diào)用checkpoint方法,然后它就會被RDDCheckpointData對象進行管理,此時這個RDD的checkpoint狀態(tài)會被設(shè)置為Initialized
2:待RDD所在的job運行結(jié)束,會調(diào)用job中最后一個RDD的doCheckpoint方法,該方法沿著RDD的血緣關(guān)系向上查找被checkpoint()方法標記過的RDD,并將其checkpoint狀態(tài)從Initialized設(shè)置為
CheckpointingInProgress
3:啟動一個單獨的job,來將血緣關(guān)系中標記為CheckpointInProgress的RDD執(zhí)行checkpoint操作,也就是將其數(shù)據(jù)寫入checkpoint目錄
4:將RDD數(shù)據(jù)寫入checkpoint目錄之后,會將RDD狀態(tài)改變?yōu)镃heckpointed;并且還會改變RDD的血緣關(guān)系,即會清除掉RDD所有依賴的RDD;最后還會設(shè)置其父RDD為新創(chuàng)建的CheckpointRDD
checkpoint與持久化的區(qū)別
這里所說的checkpoint和我們之前講的RDD持久化有什么區(qū)別嗎?
- lineage是否發(fā)生改變
linage(血緣關(guān)系)說的就是RDD之間的依賴關(guān)系
持久化,只是將數(shù)據(jù)保存在內(nèi)存中或者本地磁盤文件中,RDD的lineage(血緣關(guān)系)是不變的;Checkpoint執(zhí)行之后,RDD就沒有依賴的RDD了,也就是它的lineage改變了 - 丟失數(shù)據(jù)的可能性
持久化的數(shù)據(jù)丟失的可能性較大,如果采用 persist 把數(shù)據(jù)存在內(nèi)存中的話,雖然速度最快但是也是最不可靠的,就算放在磁盤上也不是完全可靠的,因為磁盤也會損壞。Checkpoint的數(shù)據(jù)通常是保存在高可用文件系統(tǒng)中(HDFS),丟失的可能性很低
建議:對需要checkpoint的RDD,先執(zhí)行persist(StorageLevel.DISK_ONLY) 為什么呢?
因為默認情況下,如果某個RDD沒有持久化,但是設(shè)置了checkpoint,那么這個時候,本來Spark任務(wù)已經(jīng)執(zhí)行結(jié)束了,但是由于中間的RDD沒有持久化,在進行checkpoint的時候想要將這個RDD的數(shù)據(jù)寫入外部存儲系統(tǒng)的話,就需要重新計算這個RDD的數(shù)據(jù),再將其checkpoint到外部存儲系統(tǒng)中。
如果對需要checkpoint的rdd進行了基于磁盤的持久化,那么后面進行checkpoint操作時,就會直接從磁盤上讀取rdd的數(shù)據(jù)了,就不需要重新再計算一次了,這樣效率就高了。那在這能不能使用基于內(nèi)存的持久化呢?當(dāng)然是可以的,不過沒那個必要。
checkPoint的使用
演示一下:將一個RDD的數(shù)據(jù)持久化到HDFS上面
scala代碼如下:
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:checkpoint的使用
*/
object CheckPointOpScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("CheckPointOpScala")val sc = new SparkContext(conf)if(args.length==0){System.exit(100)}val outputPath = args(0)//1:設(shè)置checkpint目錄sc.setCheckpointDir("hdfs://bigdata01:9000/chk001")val dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat")//2:對rdd執(zhí)行checkpoint操作dataRDD.checkpoint()dataRDD.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).saveAsTextFile(outputPath)sc.stop()}
}
下面我們把這個任務(wù)打包提交到集群上運行一下,看一下效果。先確保hadoop集群是正常運行的,以及hadoop中的historyserver進程和spark的historyserver進程也是正常運行的。測試數(shù)據(jù)之前已經(jīng)上傳到了hdfs上面,如果沒有則需要上傳
[root@bigdata01 soft]# hdfs dfs -ls /hello_10000000.dat
-rw-r--r-- 2 root supergroup 1860100000 2020-04-28 22:15 /hello_10000000.dat
將pom.xml中的spark-core的依賴設(shè)置為provided,然后編譯打包
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.3</version><scope>provided</scope>
</dependency>
將打包的jar包上傳到bigdata04的/data/soft/sparkjars目錄,創(chuàng)建一個新的spark-submit腳本
[root@bigdata04 sparkjars]# cp wordCountJob.sh checkPointJob.sh
[root@bigdata04 sparkjars]# vi checkPointJob.sh
spark-submit \
--class com.imooc.scala.CheckPointOpScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar \
/out-chk001
提交任務(wù)
[root@bigdata04 sparkjars]# sh -x checkPointJob.sh
執(zhí)行成功之后可以到 setCheckpointDir 指定的目錄中查看一下,可以看到目錄中會生成對應(yīng)的文件保存rdd中的數(shù)據(jù),只不過生成的文件不是普通文本文件,直接查看文件中的內(nèi)容顯示為亂碼。
接下來進到Y(jié)ARN的8088界面查看
點擊Tracking UI進入spark的ui界面
看第一個界面jobs
在這可以看出來產(chǎn)生了2個job,
第一個job是我們正常的任務(wù)執(zhí)行,執(zhí)行了39s,一共產(chǎn)生了28個task任務(wù)
第二個job是checkpoint啟動的job,執(zhí)行了35s,一共產(chǎn)生了14個task任務(wù)
stage id:stage的編號,從0開始
Duration:stage執(zhí)行消耗的時間
Tasks:Successed/Total:task執(zhí)行成功數(shù)量/task總量
Input:輸入數(shù)據(jù)量
ouput:輸出數(shù)據(jù)量
shuffle read/shuffle read:shuffle過程傳輸數(shù)據(jù)量
點擊這個界面中的DAG Visualization可以看到當(dāng)前這個任務(wù)stage的劃分情況,可以看到每個Stage包含哪些算子
checkpoint源碼分析
前面我們通過理論層面分析了checkpoint的原理,以及演示了checkpoint的使用。下面我們通過源碼層面來對我們前面分析的理論進行驗證。先下載spark源碼,下載流程和下載spark安裝包的流程一樣
把下載的安裝包解壓到idea項目目錄中
打開spark-2.4.3源碼目錄,進入core目錄,這個是spark的核心代碼,我們要查看的checkpoint的源碼就在這個項目中。在idea中打開core這個子項目
下面我們就來分析一下RDD的checkpoint功能:
checkpoint功能可以分為兩塊
1:checkpoint的寫操作
將指定RDD的數(shù)據(jù)通過checkpoint存儲到指定外部存儲中
2:checkpoint的讀操作
任務(wù)中RDD數(shù)據(jù)在使用過程中丟失了,正好這個RDD之前做過checkpoint,所以這時就需要通過checkpoint來恢復(fù)數(shù)據(jù)
先看checkpoint的寫操作
1.1 : 當(dāng) 我 們 在 自 己 開 發(fā) 的 spark 任 務(wù) 中 先 調(diào) 用 sc.setCheckpointDir 時 , 底 層 其 實 就 會 調(diào) 用SparkContext中的 setCheckpointDir 方法
def setCheckpointDir(directory: String) {
// If we are running on a cluster, log a warning if the directory is local.
// Otherwise, the driver may attempt to reconstruct the checkpointed RDD fr
// its own local file system, which is incorrect because the checkpoint fil// are actually on the executor machines.if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {logWarning("Spark is not running in local mode, therefore the checkpoints"must not be on the local filesystem. Directory '$directory' " +"appears to be on the local filesystem.")}//根據(jù)我們傳過來的目錄,后面再拼上一個子目錄,子目錄使用一個UUID隨機字符串//使用HDFS的javaAPI 在HDFS上創(chuàng)建目錄checkpointDir = Option(directory).map { dir =>val path = new Path(dir, UUID.randomUUID().toString)val fs = path.getFileSystem(hadoopConfiguration)fs.mkdirs(path)fs.getFileStatus(path).getPath.toString}
}
1.2:接著我們會調(diào)用 RDD.checkpoint 方法,此時會執(zhí)行RDD這個class中的 checkpoint 方法
//這里相當(dāng)于是checkpoint的一個標記,并沒有真正執(zhí)行checkpoint
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensu
// children RDD partitions point to the correct parent partitions. In the f
// we should revisit this consideration.//如果SparkContext沒有設(shè)置checkpointDir,則拋出異常if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the Sp} else if (checkpointData.isEmpty) {//如果設(shè)置了,則創(chuàng)建RDDCheckpointData的子類,這個子類主要負責(zé)管理RDD的checkpoi//并且會初始化checkpoint狀態(tài)為InitializedcheckpointData = Some(new ReliableRDDCheckpointData(this))}
}
這個checkpoint方法執(zhí)行完成之后,這個流程就結(jié)束了。
1.3:剩下的就是在這個設(shè)置了checkpint的RDD所在的job執(zhí)行結(jié)束之后,Spark會調(diào)用job中最后一個RDD的 doCheckpoint 方法。
這個邏輯是在SparkContext這個class的runJob方法中,當(dāng)執(zhí)行到Spark中的action算子時,這個runJob方法會被觸發(fā),開始執(zhí)行任務(wù)。
這個runJob的最后一行會調(diào)用rdd中的 doCheckpoint 方法
//在有action動作時,會觸發(fā)sparkcontext對runJob的調(diào)用
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {if (stopped.get()) {throw new IllegalStageException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)logInfo("Starting job: " + callSite.shortForm)if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)}dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler,progressBar.foreach(_.finishAll())//在這里會執(zhí)行doCheckpoint()rdd.doCheckpoint()
}
1.4:接著會進入到RDD中的 doCheckpoint 方法
這里面最終會調(diào)用 RDDCheckpointData 的 checkpoint 方法
checkpointData.get.checkpoint()
1.5:接下來進入到 RDDCheckpointData 的 checkpoint 方法中
這里面會調(diào)用子類 ReliableCheckpointRDD 中的 doCheckpoint() 方法
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the Stage of this RDDCheckpointData
將checkpoint的狀態(tài)從Initialized置為CheckpointingInProgressRDDCheckpointData.synchronized {if (cpStage == Initialized) {cpStage = CheckpointingInProgress} else {return}
}
//調(diào)用子類的doCheckpoint,默認會使用ReliableCheckpointRDD子類,創(chuàng)建一個新的Checval newRDD = doCheckpoint()// Update our Stage and truncate the RDD lineage//將checkpoint狀態(tài)置為Checkpointed狀態(tài),并且改變rdd之前的依賴,設(shè)置父rdd為新創(chuàng)建RDDCheckpointData.synchronized {cpRDD = Some(newRDD)cpStage = Checkpointedrdd.markCheckpointed()}
}
1.6:接著來進入 ReliableCheckpointRDD 中的 doCheckpoint() 方法。
這里面會調(diào)用 ReliableCheckpointRDD 中的 writeRDDToCheckpointDirectory 方法將rdd的數(shù)據(jù)寫入HDFS中的 checkpoint 目錄,并且返回創(chuàng)建的 CheckpointRDD
1.7:接下來進入 ReliableCheckpointRDD 的writeRDDToCheckpointDirectory 方法
這里面最終會啟動一個job,將checkpoint的數(shù)據(jù)寫入到指定的HDFS目錄中
Spark程序性能優(yōu)化
性能優(yōu)化分析
一個計算任務(wù)的執(zhí)行主要依賴于CPU、內(nèi)存、帶寬。Spark是一個基于內(nèi)存的計算引擎,所以對它來說,影響最大的可能就是內(nèi)存,一般我們的任務(wù)遇到了性能瓶頸大概率都是內(nèi)存的問題,當(dāng)然了CPU和帶寬也可能會影響程序的性能,這個情況也不是沒有的,只是比較少。
Spark性能優(yōu)化,其實主要就是在于對內(nèi)存的使用進行調(diào)優(yōu)。通常情況下,如果你的Spark程序計算的數(shù)據(jù)量比較小,并且你的內(nèi)存足夠使用,那么只要網(wǎng)絡(luò)不至于卡死,一般是不會有大的性能問題的。但是Spark程序的性能問題往往出現(xiàn)在針對大數(shù)據(jù)量進行計算(比如上億條數(shù)的數(shù)據(jù),或者上T規(guī)模的數(shù)據(jù)),這個時候如果內(nèi)存分配不合理就會比較慢,所以,Spark性能優(yōu)化,主要是對內(nèi)存進行優(yōu)化。
內(nèi)存都去哪了
- 每個Java對象,都有一個對象頭,會占用16個字節(jié),主要是包括了一些對象的元信息,比如指向它的類的指針。如果一個對象本身很小,比如就包括了一個int類型的field,那么它的對象頭實際上比對象自身還要大。
- Java的String對象的對象頭,會比它內(nèi)部的原始數(shù)據(jù),要多出40個字節(jié)。因為它內(nèi)部使用char數(shù)組來保存內(nèi)部的字符序列,并且還要保存數(shù)組長度之類的信息。
- Java中的集合類型,比如HashMap和LinkedList,內(nèi)部使用的是鏈表數(shù)據(jù)結(jié)構(gòu),所以對鏈表中的每一個數(shù)據(jù),都使用了Entry對象來包裝。Entry對象不光有對象頭,還有指向下一個Entry的指針,通常占用8個字節(jié)。
所以把原始文件中的數(shù)據(jù)轉(zhuǎn)化為內(nèi)存中的對象之后,占用的內(nèi)存會比原始文件中的數(shù)據(jù)要大。那我如何預(yù)估程序會消耗多少內(nèi)存呢?
通過cache方法,可以看到RDD中的數(shù)據(jù)cache到內(nèi)存中之后占用多少內(nèi)存,這樣就能看出了
代碼如下:這個測試代碼就只寫一個scala版本的了
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:測試內(nèi)存占用情況
*/
object TestMemoryScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("TestMemoryScala").setMaster("local")val sc = new SparkContext(conf)val dataRDD = sc.textFile("hdfs://bigdata01:9000/hello_10000000.dat").cache()val count = dataRDD.count()println(count)//while循環(huán)是為了保證程序不結(jié)束,方便在本地查看4040頁面中的storage信息while(true){;}}
}
執(zhí)行代碼,訪問localhost的4040端口界面
這個界面其實就是spark的任務(wù)界面,在本地運行任務(wù)的話可以直接訪問4040界面查看。
點擊stages可以看到任務(wù)的原始輸入數(shù)據(jù)是多大
點擊storage可以看到將數(shù)據(jù)加載到內(nèi)存,生成RDD之后的大小
這樣我們就能知道這一份數(shù)據(jù)在RDD中會占用多少內(nèi)存了,這樣在使用的時候,如果想要把數(shù)據(jù)全部都加載進內(nèi)存,就需要給這個任務(wù)分配這么多內(nèi)存了,當(dāng)然了你分配少一些也可以,只不過這樣計算效率會變低,因為RDD中的部分數(shù)據(jù)內(nèi)存放不下就會放到磁盤了。
性能優(yōu)化方案
下面我們通過這幾個方式來實現(xiàn)對Spark程序的性能優(yōu)化
- 高性能序列化類庫
- 持久化或者checkpoint
- JVM垃圾回收調(diào)優(yōu)
- 提高并行度
- 數(shù)據(jù)本地化
- 算子優(yōu)化
高性能序列化類庫
在任何分布式系統(tǒng)中,序列化都是扮演著一個重要的角色的。
如果使用的序列化技術(shù),在執(zhí)行序列化操作的時候很慢,或者是序列化后的數(shù)據(jù)還是很大,那么會讓分布式應(yīng)用程序的性能下降很多。所以,進行Spark性能優(yōu)化的第一步,就是進行序列化的性能優(yōu)化。Spark默認會在一些地方對數(shù)據(jù)進行序列化,如果我們的算子函數(shù)使用到了外部的數(shù)據(jù)(比如Java中的自定義類型),那么也需要讓其可序列化,否則程序在執(zhí)行的時候是會報錯的,提示沒有實現(xiàn)序列化,這個一定要注意。
原因是這樣的:
因為Spark的初始化工作是在Driver進程中進行的,但是實際執(zhí)行是在Worker節(jié)點的Executor進程中進行的;當(dāng)Executor端需要用到Driver端封裝的對象時,就需要把Driver端的對象通過序列化傳輸?shù)紼xecutor端,這個對象就需要實現(xiàn)序列化。否則會報錯,提示對象沒有實現(xiàn)序列化
注意了,其實遇到這種沒有實現(xiàn)序列化的對象,解決方法有兩種
- 如果此對象可以支持序列化,則將其實現(xiàn)Serializable接口,讓它支持序列化
- 如果此對象不支持序列化,針對一些數(shù)據(jù)庫連接之類的對象,這種對象是不支持序列化的,所以可以把這個代碼放到算子內(nèi)部,這樣就不會通過driver端傳過去了,它會直接在executor中執(zhí)行。
Spark對于序列化的便捷性和性能進行了一個取舍和權(quán)衡。默認情況下,Spark傾向于序列化的便捷性,使用了Java自身提供的序列化機制——基于 ObjectInputStream 和 ObjectOutputStream 的序列化機制,因為這種方式是Java原生提供的,使用起來比較方便,但是Java序列化機制的性能并不高。序列化的速度相對較慢,而且序列化以后的數(shù)據(jù),相對來說還是比較大,比較占空間。所以,如果你的Spark應(yīng)用程序?qū)?nèi)存很敏感,那默認的Java序列化機制并不是最好的選擇。
Spark實際上提供了兩種序列化機制:
Java序列化機制和Kryo序列化機制
Spark只是默認使用了java這種序列化機制
- Java序列化機制:默認情況下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream機制進行對象的序列化。只要你的類實現(xiàn)了Serializable接口,那么都是可以序列化的。Java序列化機制的速度比較慢,而且序列化后的數(shù)據(jù)占用的內(nèi)存空間比較大,這是它的缺點
- Kryo序列化機制:Spark也支持使用Kryo序列化。Kryo序列化機制比Java序列化機制更快,而且序列化后的數(shù)據(jù)占用的空間更小,通常比Java序列化的數(shù)據(jù)占用的空間要小10倍左右。
Kryo序列化機制之所以不是默認序列化機制的原因:
- 第一點:因為有些類型雖然實現(xiàn)了Seriralizable接口,但是它也不一定能夠被Kryo進行序列化;
- 第二點:如果你要得到最佳的性能,Kryo還要求你在Spark應(yīng)用程序中,對所有你需要序列化的類型都進行手工注冊,這樣就比較麻煩了
如果要使用Kryo序列化機制
首先要用 SparkConf 設(shè)置 spark.serializer 的值為 org.apache.spark.serializer.KryoSerializer ,就是將Spark的序列化器設(shè)置為 KryoSerializer 。這樣,Spark在進行序列化時,就會使用Kryo進行序列化了。使用Kryo時針對需要序列化的類,需要預(yù)先進行注冊,這樣才能獲得最佳性能——如果不注冊的話,Kryo也能正常工作,只是Kryo必須時刻保存類型的全類名,反而占用不少內(nèi)存。
Spark默認對Scala中常用的類型在Kryo中做了注冊,但是,如果在自己的算子中,使用了外部的自定義類型的對象,那么還是需要對其進行注冊。
注冊自定義的數(shù)據(jù)類型格式:
conf.registerKryoClasses(...)
注意:如果要序列化的自定義的類型,字段特別多,此時就需要對Kryo本身進行優(yōu)化,因為Kryo需要調(diào)用 SparkConf.set() 方法,設(shè)置 spark.kryoserializer.buffer.mb 參數(shù)的值,將其調(diào)大,默認值為 2 ,單位是 MB ,也就是說最大能緩存 2M 的對象,然后進行序列化??梢栽诒匾獣r將其調(diào)大。
什么場景下適合使用Kryo序列化?
一般是針對一些自定義的對象,例如我們自己定義了一個對象,這個對象里面包含了幾十M,或者上百M的數(shù)據(jù),然后在算子函數(shù)內(nèi)部,使用到了這個外部的大對象。
如果默認情況下,讓Spark用java序列化機制來序列化這種外部的大對象,那么就會導(dǎo)致序列化速度比較慢,并且序列化以后的數(shù)據(jù)還是比較大。所以,在這種情況下,比較適合使用Kryo序列化類庫,來對外部的大對象進行序列化,提高序列化速度,減少序列化后的內(nèi)存空間占用。
用代碼實現(xiàn)一個案例:
scala代碼如下:
`
package com.imooc.scala
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:Kryo序列化的使用
*/
object KryoSerScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("KryoSerScala").setMaster("local")//指定使用kryo序列化機制,注意:如果使用了registerKryoClasses,其實這一行設(shè)置.set("spark.serializer","org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array(classOf[Person]))//注冊自定義的數(shù)據(jù)類型val sc = new SparkContext(conf)val dataRDD = sc.parallelize(Array("hello you","hello me"))val wordsRDD = dataRDD.flatMap(_.split(" "))val personRDD = wordsRDD.map(word=>Person(word,18)).persist(StorageLevel.MpersonRDD.foreach(println(_))//while循環(huán)是為了保證程序不結(jié)束,方便在本地查看4040頁面中的storage信息while (true) {;}}
}
case class Person(name: String,age: Int) extends Serializable
執(zhí)行任務(wù),然后訪問localhost的4040界面
在界面中可以看到cache的數(shù)據(jù)大小是 31 字節(jié)。
那我們把kryo序列化設(shè)置去掉,使用默認的java序列化看一下效果
修改代碼,注釋掉這兩行代碼即可
//.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//.registerKryoClasses(Array(classOf[Person]))
運行任務(wù),再訪問4040界面
發(fā)現(xiàn)此時占用的內(nèi)存空間是 138 字節(jié),比使用kryo的方式內(nèi)存空間多占用了將近5倍。
所以從這可以看出來,使用 kryo 序列化方式對內(nèi)存的占用會降低很多。
持久化或者checkpoint
針對程序中多次被transformation或者action操作的RDD進行持久化操作,避免對一個RDD反復(fù)進行計算,再進一步優(yōu)化,使用Kryo序列化的持久化級別,減少內(nèi)存占用。為了保證RDD持久化數(shù)據(jù)在可能丟失的情況下還能實現(xiàn)高可靠,則需要對RDD執(zhí)行Checkpoint操作
JVM垃圾回收調(diào)優(yōu)
由于Spark是基于內(nèi)存的計算引擎,RDD緩存的數(shù)據(jù),以及算子執(zhí)行期間創(chuàng)建的對象都是放在內(nèi)存中的,所以針對Spark任務(wù)如果內(nèi)存設(shè)置不合理會導(dǎo)致大部分時間都消耗在垃圾回收上。
對于垃圾回收來說,最重要的就是調(diào)節(jié)RDD緩存占用的內(nèi)存空間,和算子執(zhí)行時創(chuàng)建的對象占用的內(nèi)存空間的比例。默認情況下,Spark使用每個 executor 60% 的內(nèi)存空間來緩存RDD,那么只有 40% 的內(nèi)存空間來存放算子執(zhí)行期間創(chuàng)建的對象。在這種情況下,可能由于內(nèi)存空間的不足,并且算子對應(yīng)的task任務(wù)在運行時創(chuàng)建的對象過大,那么一旦發(fā)現(xiàn) 40% 的內(nèi)存空間不夠用了,就會觸發(fā)Java虛擬機的垃圾回收操作。因此在極端情況下,垃圾回收操作可能會被頻繁觸發(fā)。在這種情況下,如果發(fā)現(xiàn)垃圾回收頻繁發(fā)生。那么就需要對這個比例進行調(diào)優(yōu)了spark.storage.memoryFraction 參數(shù)的值默認是 0.6 。使用 SparkConf().set(“spark.storage.memoryFraction”, “0.5”) 可以進行修改,就是將RDD緩存占用內(nèi)存空間的比例降低為 50% ,從而提供更多的內(nèi)存空間來保存task運行時創(chuàng)建的對象。
因此,對于RDD持久化而言,完全可以使用Kryo序列化,加上降低其executor內(nèi)存占比的方式,來減少其內(nèi)存消耗。給task提供更多的內(nèi)存,從而避免task在執(zhí)行時頻繁觸發(fā)垃圾回收。我們可以對task的垃圾回收進行監(jiān)測,在spark的任務(wù)執(zhí)行界面,可以查看每個task執(zhí)行消耗的時間,以及task gc消耗的時間。
重新向集群中提交checkpoint的代碼,查看spark任務(wù)的task指標信息
確保Hadoop集群、yarn的historyserver進程以及spark的historyserver進程是正常運行的刪除checkpoint任務(wù)的輸出目錄
[root@bigdata04 sparkjars]# hdfs dfs -rm -r /out-chk001
提交任務(wù)
[root@bigdata04 sparkjars]# sh -x checkPointJob.sh
點擊生成的第一個job,再點擊進去查看這個job的stage,進入第一個stage,查看task的執(zhí)行情況,看這里面的GC time的數(shù)值會不會比較大,最直觀的就是如果gc time這里標紅了,則說明gc時間過長。
上面這個是分任務(wù)查看,其實還可以查看全局的,看Executor進程中整個任務(wù)執(zhí)行總時間和gc的消耗時間。
既然說到了Java中的GC,那我們就需要說道說道了。
Java堆空間被劃分成了兩塊空間:一個是年輕代,一個是老年代。
年輕代放的是短時間存活的對象
老年代放的是長時間存活的對象。
年輕代又被劃分了三塊空間, Eden、Survivor1、Survivor2
年輕代占堆內(nèi)存的1/3,老年代占堆內(nèi)存的2/3
其中年輕代又被劃分了三塊, Eden,Survivor1,Survivor2 的比例為 8:1:1
Eden區(qū)域和Survivor1區(qū)域用于存放對象,Survivor2區(qū)域備用。
我們創(chuàng)建的對象,首先會放入Eden區(qū)域,如果Eden區(qū)域滿了,那么就會觸發(fā)一次Minor GC,進行年輕代的垃圾回收(其實就是回收Eden區(qū)域內(nèi)沒有人使用的對象),然后將存活的對象存入Survivor1區(qū)域,再創(chuàng)建對象的時候繼續(xù)放入Eden區(qū)域。第二次Eden區(qū)域滿了,那么Eden和Survivor1區(qū)域中存活的對象,當(dāng)?shù)谌蜤den區(qū)域再滿了的時候,Eden和Survivor2區(qū)域中存活的對象,會一塊被移動到Survivor1區(qū)域中,按照這個規(guī)律進行循環(huán)。
注意了,Full GC是一個重量級的垃圾回收,Full GC執(zhí)行的時候,程序是處于暫停狀態(tài)的,這樣會非常影響性能。
1:最直接的就是提高Executor的內(nèi)存
在spark-submit中通過參數(shù)指定executor的內(nèi)存
--executor-memory 1G
2:調(diào)整Eden與s1和s2的比值【一般情況下不建議調(diào)整這塊的比值】
- -XX:NewRatio=4:設(shè)置年輕代(包括Eden和兩個Survivor區(qū))與年老代的比值(除去持久代).設(shè)置為4,則年輕代與年老代所占比值為1:4,年輕代占整個堆棧的1/5
- -XX:SurvivorRatio=4:設(shè)置年輕代中Eden區(qū)與Survivor區(qū)的大小比值.設(shè)置為4,則兩個Survivor區(qū)與一個Eden區(qū)的比值為2:4,一個Survivor區(qū)占整個年輕代的1/6
具體使用的時候在 spark-submit 腳本中通過 --conf 參數(shù)設(shè)置即可
--conf "spark.executor.extraJavaOptions= -XX:SurvivorRatio=4 -XX:NewRatio=4"
其實最直接的就是增加Executor的內(nèi)存,如果這個內(nèi)存上不去,其它的修改都是徒勞。舉個例子就是說,一個20歲的成年人和一個3歲的小孩。3歲的小孩掌握再多的格斗技巧都沒有用,在絕對的實力面前一切都是花架子。所以說我們一般很少需要去調(diào)整Eden、s1、s2的比值,一般都是直接增加Executor的內(nèi)存比較靠譜。
提高并行度
實際上Spark集群的資源并不一定會被充分利用到,所以要盡量設(shè)置合理的并行度,來充分地利用集群的資源,這樣才能提高Spark程序的性能。
Spark會自動設(shè)置以文件作為輸入源的RDD的并行度,依據(jù)其大小,比如HDFS,就會給每一個block創(chuàng)建一個partition,也依據(jù)這個設(shè)置并行度。對于reduceByKey等會發(fā)生shuffle操作的算子,會使用并行度最大的父RDD的并行度??梢允謩邮褂?textFile()、parallelize() 等方法的第二個參數(shù)來設(shè)置并行度;也可以使用 spark.default.parallelism 參數(shù),來設(shè)置統(tǒng)一的并行度。Spark官方的推薦是,給集群中的每個cpu core設(shè)置 2~3 個
下面來舉個例子
我在 spark-submit 腳本中給任務(wù)設(shè)置了5 個executor,每個executor,設(shè)置了2個cpu core
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 5 \
--executor-cores 2 \
.....
此時,如果我在代碼中設(shè)置了默認并行度為5
conf.set("spark.default.parallelism","5")
這個參數(shù)設(shè)置完了以后,也就意味著所有RDD的partition都被設(shè)置成了5個,針對RDD的每一個partition,spark會啟動一個task來進行計算,所以對于所有的算子操作,都只會創(chuàng)建5個task來處理對應(yīng)的RDD中的數(shù)據(jù)。
但是注意了,我們前面在spark-submit腳本中設(shè)置了5個executor,每個executor 2個cpu core,所以這個時候spark其實會向yarn集群申請10個cpu core,但是我們在代碼中設(shè)置了默認并行度為5,只會產(chǎn)生5個task,一個task使用一個cpu core,那也就意味著有5個cpu core是空閑的,這樣申請的資源就浪費了一半。
其實最好的情況,就是每個cpu core都不閑著,一直在運行,這樣可以達到資源的最大使用率,其實讓一個cpu core運行一個task都是有點浪費的,官方也建議讓每個cpu core運行2~3個task,這樣可以充分壓榨CPU的性能。
是這樣的,因為每個task執(zhí)行的順序和執(zhí)行結(jié)束的時間很大概率是不一樣的,如果正好有10個cpu,運行10個taks,那么某個task可能很快就執(zhí)行完了,那么這個CPU就空閑下來了,這樣資源就浪費了。所以說官方推薦,給每個cpu分配2~3個task是比較合理的,可以充分利用CPU資源,發(fā)揮它最大的價值。下面我們來實際寫個案例看一下效果
Scala代碼如下:
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:設(shè)置并行度
* 1:可以在textFile或者parallelize等方法的第二個參數(shù)中設(shè)置并行度
* 2:或者通過spark.default.parallelism參數(shù)統(tǒng)一設(shè)置并行度
*/
object MoreParallelismScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("MoreParallelismScala")//設(shè)置全局并行度conf.set("spark.default.parallelism","5")val sc = new SparkContext(conf)val dataRDD = sc.parallelize(Array("hello","you","hello","me","hehe","heldataRDD.map((_,1)).reduceByKey(_ + _).foreach(println(_))sc.stop()}
}
對代碼編譯打包
spark-submit腳本內(nèi)容如下:
[root@bigdata04 sparkjars]# vi moreParallelismJob.sh
spark-submit \
--class com.imooc.scala.MoreParallelismScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 5 \
--executor-cores 2 \
db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar
任務(wù)提交到集群運行之后,查看spark的任務(wù)界面
先看executors,這里顯示了4個executor和1個driver進程,為什么不是5個executor進程呢?
是因為我們現(xiàn)在使用的是yarn-cluster模式,driver進程運行在集群內(nèi)部,所以它占了一個executor,如果使用的是yarn-client模式,就會產(chǎn)生5個executor和1個單獨的driver進程。
然后去看satges界面,兩個Stage都是5個task并行執(zhí)行,這5個task會使用5個cpu,但是我們給這個任務(wù)申請了10個cpu,所以就有5個是空閑的了。
如果想要最大限度利用CPU的性能,至少將 spark.default.parallelism 的值設(shè)置為10,這樣可以實現(xiàn)一個cpu運行一個task,其實官方推薦是設(shè)置為20或者30。
其實這個參數(shù)也可以在spark-submit腳本中動態(tài)設(shè)置,通過 --conf 參數(shù)設(shè)置,這樣就比較靈活了。
這就是并行度相關(guān)的設(shè)置
接下來我們來看一個圖,加深一下理解
這個圖中描述的就是剛才我們演示的兩種情況下Executor和Task之間的關(guān)系。
最后我們來分析總結(jié)一下spark-submit腳本中經(jīng)常配置的一些參數(shù)
--name mySparkJobName:指定任務(wù)名稱
--class com.imooc.scala.xxxxx :指定入口類
--master yarn :指定集群地址,on yarn模式指定yarn
--deploy-mode cluster :client代表yarn-client,cluster代表yarn-cluster
--executor-memory 1G :executor進程的內(nèi)存大小,實際工作中設(shè)置2~4G即可
--num-executors 2 :分配多少個executor進程
--executor-cores 2 : 一個executor進程分配多少個cpu core
--driver-cores 1 :driver進程分配多少cpu core,默認為1即可
--driver-memory 1G:driver進程的內(nèi)存,如果需要使用類似于collect之類的action算子向
--jars fastjson.jar,abc.jar 在這里可以設(shè)置job依賴的第三方j(luò)ar包【不建議把第三方依賴
--conf "spark.default.parallelism=10":可以動態(tài)指定一些spark任務(wù)的參數(shù),指定多個參
最后注意一點:針對 --num-executors 和 --executor-cores 的設(shè)置
看這兩種方式設(shè)置有什么區(qū)別:
第一種方式:
--num-executors 2
--executor-cores 1
第二種方式:
--num-executors 1
--executor-cores 2
這兩種設(shè)置最終都會向集群申請2個cpu core,可以并行運行兩個task,但是這兩種設(shè)置方式有什么區(qū)別呢?
-
第一種方法:多executor模式
由于每個executor只分配了一個cpu core,我們將無法利用在同一個JVM中運行多個任務(wù)的優(yōu)點。 我們假設(shè)這兩個executor是在兩個節(jié)點中啟動的,那么針對廣播變量這種操作,將在兩個節(jié)點的中都復(fù)制1份,最終會復(fù)制兩份 -
第二種方法:多core模式
此時一個executor中會有2個cpu core,這樣可以利用同一個JVM中運行多個任務(wù)的優(yōu)點,并且針對廣播變量的這種操作,只會在這個executor對應(yīng)的節(jié)點中復(fù)制1份即可。
那是不是我可以給一個executor分配很多的cpu core,也不是的,因為一個executor的內(nèi)存大小是固定的,如果在里面運行過多的task可能會導(dǎo)致內(nèi)存不夠用,所以這塊一般在工作中我們會給一executor分配 2~4G 內(nèi)存,對應(yīng)的分配 2~4 個cpu core。
數(shù)據(jù)本地化
數(shù)據(jù)本地化對于Spark Job性能有著巨大的影響。如果數(shù)據(jù)以及要計算它的代碼是在一起的,那么性能當(dāng)然會非常高。但是,如果數(shù)據(jù)和計算它的代碼是分開的,那么其中之一必須到另外一方的機器上。通常來說,移動代碼到其它節(jié)點,會比移動數(shù)據(jù)到代碼所在的節(jié)點,速度要得多,因為代碼比較小。Spark也正是基于這個數(shù)據(jù)本地化的原則來構(gòu)建task調(diào)度算法的。
數(shù)據(jù)本地化,指的是,數(shù)據(jù)離計算它的代碼有多近?;跀?shù)據(jù)距離代碼的距離,有幾種數(shù)據(jù)本地化級別:
數(shù)據(jù)本地化級別 解釋
PROCESS_LOCAL 進程本地化,性能最好:數(shù)據(jù)和計算它的代碼在同一個JVM進程中
NODE_LOCAL 節(jié)點本地化:數(shù)據(jù)和計算它的代碼在一個節(jié)點上,但是不在一個JVM進程
NO_PREF 數(shù)據(jù)從哪里過來,性能都是一樣的,比如從數(shù)據(jù)庫中獲取數(shù)據(jù),對于task
RACK_LOCAL 數(shù)據(jù)和計算它的代碼在一個機架上,數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點之間進行傳
ANY 數(shù)據(jù)可能在任意地方,比如其它網(wǎng)絡(luò)環(huán)境內(nèi),或者其它機架上,性能最差
Spark傾向使用最好的本地化級別調(diào)度task,但這是不現(xiàn)實的
如果目前我們要處理的數(shù)據(jù)所在的executor上目前沒有空閑的CPU,那么Spark就會放低本地化級別。這時有兩個選擇:
第一,等待,直到executor上的cpu釋放出來,那么就分配task過去;
第二,立即在任意一個其它executor上啟動一個task。
Spark默認會等待指定時間,期望task要處理的數(shù)據(jù)所在的節(jié)點上的executor空閑出一個cpu,從而將task分配過去,只要超過了時間,那么Spark就會將task分配到其它任意一個空閑的executor上可以設(shè)置參數(shù), spark.locality 系列參數(shù),來調(diào)節(jié)Spark等待task可以進行數(shù)據(jù)本地化的時間
spark.locality.wait(3000毫秒):默認等待3秒
spark.locality.wait.process:等待指定的時間看能否達到數(shù)據(jù)和計算它的代碼在同一個JVM
spark.locality.wait.node:等待指定的時間看能否達到數(shù)據(jù)和計算它的代碼在一個節(jié)點上執(zhí)行
spark.locality.wait.rack:等待指定的時間看能否達到數(shù)據(jù)和計算它的代碼在一個機架上
看這個圖里面的task,此時的數(shù)據(jù)本地化級別是最優(yōu)的 PROCESS_LOCAL
Spark性能優(yōu)化之算子優(yōu)化
map vs mapPartitions
- map 操作:對 RDD 中的每個元素進行操作,一次處理一條數(shù)據(jù)
- mapPartitions 操作:對 RDD 中每個 partition 進行操作,一次處理一個分區(qū)的數(shù)據(jù)所以:
- map 操作: 執(zhí)行 1 次 map算子只處理 1 個元素,如果 partition 中的元素較多,假設(shè)當(dāng)前已經(jīng)處理了 1000 個元素,在內(nèi)存不足的情況下,Spark 可以通過GC等方法回收內(nèi)存(比如將已處理掉的1000 個元素從內(nèi)存中回收)。因此, map 操作通常不會導(dǎo)致OOM異常;
- mapPartitions 操作: 執(zhí)行 1 次map算子需要接收該 partition 中的所有元素,因此一旦元素很多而內(nèi)存不足,就容易導(dǎo)致OOM的異常,也不是說一定就會產(chǎn)生OOM異常,只是和map算子對比的話,相對來說容易產(chǎn)生OOM異常
不過一般情況下,mapPartitions 的性能更高;初始化操作、數(shù)據(jù)庫鏈接等操作適合使用 mapPartitions操作。
這是因為:假設(shè)需要將 RDD 中的每個元素寫入數(shù)據(jù)庫中,這時候就應(yīng)該把創(chuàng)建數(shù)據(jù)庫鏈接的操作放置在mapPartitions 中,創(chuàng)建數(shù)據(jù)庫鏈接這個操作本身就是個比較耗時的,如果該操作放在 map 中執(zhí)行,將會頻繁執(zhí)行,比較耗時且影響數(shù)據(jù)庫的穩(wěn)定性。
scala代碼實現(xiàn)如下
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/**
* 需求:mapPartitons的使用
*/
object MapPartitionsOpScala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("MapPartitionsOpScala").setMaster("local")val sc = new SparkContext(conf)//設(shè)置分區(qū)數(shù)量為2val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)//map算子一次處理一條數(shù)據(jù)/*val sum = dataRDD.map(item=>{println("==============")item * 2}).reduce( _ + _)*///mapPartitions算子一次處理一個分區(qū)的數(shù)據(jù)val sum = dataRDD.mapPartitions(it=>{//建議針對初始化鏈接之類的操作,使用mapPartitions,放在mapPartitions內(nèi)部//例如:創(chuàng)建數(shù)據(jù)庫鏈接,使用mapPartitions可以減少鏈接創(chuàng)建的次數(shù),提高性能//注意:創(chuàng)建數(shù)據(jù)庫鏈接的代碼建議放在次數(shù),不要放在Driver端或者it.foreach內(nèi)部//數(shù)據(jù)庫鏈接放在Driver端會導(dǎo)致鏈接無法序列化,無法傳遞到對應(yīng)的task中執(zhí)行,所以//數(shù)據(jù)庫鏈接放在it.foreach()內(nèi)部還是會創(chuàng)建多個鏈接,和使用map算子的效果是一樣println("==================")val result = new ArrayBuffer[Int]()//這個foreach是調(diào)用的scala里面的函數(shù)it.foreach(item=>{result.+=(item * 2)})//關(guān)閉數(shù)據(jù)庫鏈接result.toIterator}).reduce(_ + _)println("sum:"+sum)sc.stop()}
}
foreach vs foreachPartition
foreach:一次處理一條數(shù)據(jù)
foreachPartition:一次處理一個分區(qū)的數(shù)據(jù)
foreachPartition的特性和mapPartitions 的特性是一樣的,唯一的區(qū)別就是mapPartitions 是 transformation 操作(不會立即執(zhí)行),foreachPartition是 action 操作(會立即執(zhí)行)
Scala代碼如下:
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:foreachPartition的使用
*/
object ForeachPartitionOpScala {
def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ForeachPartitionOpScala").setMaster("local")val sc = new SparkContext(conf)//設(shè)置分區(qū)數(shù)量為2val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)//foreachPartition:一次處理一個分區(qū)的數(shù)據(jù),作用和mapPartitions類似//唯一的區(qū)是mapPartitions是transformation算子,foreachPartition是action算子dataRDD.foreachPartition(it=>{//在此處獲取數(shù)據(jù)庫鏈接println("===============")it.foreach(item=>{//在這里使用數(shù)據(jù)庫鏈接println(item)})//關(guān)閉數(shù)據(jù)庫鏈接})sc.stop()}
}
repartition的使用
對RDD進行重分區(qū),repartition主要有兩個應(yīng)用場景:
- 可以調(diào)整RDD的并行度
針對個別RDD,如果感覺分區(qū)數(shù)量不合適,想要調(diào)整,可以通過repartition進行調(diào)整,分區(qū)調(diào)整了之后,對應(yīng)的并行度也就可以調(diào)整了 - 可以解決RDD中數(shù)據(jù)傾斜的問題
如果RDD中不同分區(qū)之間的數(shù)據(jù)出現(xiàn)了數(shù)據(jù)傾斜,可以通過repartition實現(xiàn)數(shù)據(jù)重新分發(fā),可以均勻分發(fā)到不同分區(qū)中
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:repartition的使用
* */
object RepartitionOpScala {
def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("RepartitionOpScala").setMaster("local")val sc = new SparkContext(conf)//設(shè)置分區(qū)數(shù)量為2val dataRDD = sc.parallelize(Array(1,2,3,4,5),2)//重新設(shè)置RDD的分區(qū)數(shù)量為3,這個操作會產(chǎn)生shuffle//也可以解決RDD中數(shù)據(jù)傾斜的問題dataRDD.repartition(3).foreachPartition(it=>{println("=========")it.foreach(println(_))})//通過repartition可以控制輸出數(shù)據(jù)產(chǎn)生的文件個數(shù)dataRDD.saveAsTextFile("hdfs://bigdata01:9000/rep-001")dataRDD.repartition(1).saveAsTextFile("hdfs://bigdata01:9000/rep-002")sc.stop()}
}
reduceByKey和groupByKey的區(qū)別
在實現(xiàn)分組聚合功能時這兩個算子有什么區(qū)別?
看這兩行代碼
val counts = wordCountRDD.reduceByKey(_ + _)
val counts = wordCountRDD.groupByKey().map(wc => (wc._1, wc._2.sum))
這兩行代碼的最終效果是一樣的,都是對wordCountRDD中每個單詞出現(xiàn)的次數(shù)進行聚合統(tǒng)計。
那這兩種方式在原理層面有什么區(qū)別嗎?
首先這兩個算子在執(zhí)行的時候都會產(chǎn)生shuffle
但是:
1:當(dāng)采用reduceByKey時,數(shù)據(jù)在進行shuffle之前會先進行局部聚合
2:當(dāng)使用groupByKey時,數(shù)據(jù)在shuffle之間不會進行局部聚合,會原樣進行shuffle
這樣的話reduceByKey就減少了shuffle的數(shù)據(jù)傳送,所以效率會高一些。
下面來看這個圖,加深一下理解
從圖中可以看出來reduceByKey在shuffle之前會先對數(shù)據(jù)進行局部聚合,而groupByKey不會,所以在實現(xiàn)分組聚合的需求中,reduceByKey性能略勝一籌。