淮北市住房和城鄉(xiāng)建設(shè)局網(wǎng)站出售外鏈
一、Shuffle 的作用是什么?
? ? Shuffle 操作可以理解為將集群中各個(gè)節(jié)點(diǎn)上的數(shù)據(jù)進(jìn)行重新整理和分類(lèi)的過(guò)程。這一概念源自 Hadoop 的 MapReduce 模型,Shuffle 是連接 Map 階段和 Reduce 階段的關(guān)鍵環(huán)節(jié)。在分布式計(jì)算中,每個(gè)計(jì)算節(jié)點(diǎn)通常只處理任務(wù)的一部分?jǐn)?shù)據(jù)。如果下一個(gè)階段需要依賴(lài)前一個(gè)階段的所有計(jì)算結(jié)果,就需要對(duì)這些結(jié)果進(jìn)行重新整合和分類(lèi),這就是 Shuffle 的主要任務(wù)。在 Spark 中,RDD 之間的依賴(lài)關(guān)系分為窄依賴(lài)和寬依賴(lài),其中寬依賴(lài)涉及 Shuffle 操作。因此,在 Spark 程序中,每個(gè) job 的階段(stage)劃分依據(jù)就是是否存在 Shuffle 操作,每個(gè) stage 包含一系列的 RDD map 操作。
二、為什么 Shuffle 操作耗時(shí)?
? ? Shuffle 操作需要對(duì)數(shù)據(jù)進(jìn)行重新聚合和劃分,并將這些數(shù)據(jù)分配到集群的各個(gè)節(jié)點(diǎn)上進(jìn)行下一步的處理。這一過(guò)程中,不同節(jié)點(diǎn)之間需要進(jìn)行大量的數(shù)據(jù)交換。由于數(shù)據(jù)傳輸需要通過(guò)網(wǎng)絡(luò),并且通常需要先將數(shù)據(jù)寫(xiě)入磁盤(pán),因此每個(gè)節(jié)點(diǎn)都會(huì)進(jìn)行大量的文件讀寫(xiě)操作。這些讀寫(xiě)操作不僅增加了 I/O 開(kāi)銷(xiāo),還可能導(dǎo)致網(wǎng)絡(luò)擁塞,從而使 Shuffle 操作變得非常耗時(shí),相比之下,簡(jiǎn)單的 map 操作則要快得多。
三、Spark 當(dāng)前的ShuffleManager模式及處理機(jī)制
? 在 Spark 程序中,Shuffle 操作由 ShuffleManager 對(duì)象進(jìn)行管理。目前,Spark 支持兩種主要的 ShuffleManager 模式:HashShuffleManager 和 SortShuffleManager。Shuffle 操作包括當(dāng)前階段的 Shuffle Write(寫(xiě)入磁盤(pán))和下一階段的 Shuffle Read(讀取),這兩種模式的主要區(qū)別在于 Shuffle Write 階段的處理方式。
3.1、HashShuffleManager
HashShuffleManager 是 Spark 最初使用的 ShuffleManager 模式。在這種模式下,每個(gè)任務(wù)(task)會(huì)為每個(gè)分區(qū)(partition)創(chuàng)建一個(gè)臨時(shí)文件,并將數(shù)據(jù)直接寫(xiě)入對(duì)應(yīng)的文件中。這種方式簡(jiǎn)單直觀,但在處理大量分區(qū)時(shí)會(huì)產(chǎn)生大量的小文件,導(dǎo)致磁盤(pán) I/O 開(kāi)銷(xiāo)增加。此外,每個(gè)任務(wù)都需要為每個(gè)分區(qū)打開(kāi)和關(guān)閉文件,這也會(huì)增加文件句柄的開(kāi)銷(xiāo)。
3.2、SortShuffleManager
SortShuffleManager 是目前 Spark 默認(rèn)使用的 ShuffleManager 模式。在這種模式下,任務(wù)會(huì)先對(duì)數(shù)據(jù)進(jìn)行排序,然后將排序后的數(shù)據(jù)寫(xiě)入一個(gè)或幾個(gè)大文件中。這種方式減少了文件的數(shù)量,提高了磁盤(pán) I/O 效率。此外,SortShuffleManager 還支持?jǐn)?shù)據(jù)的內(nèi)存緩存,只有在內(nèi)存不足時(shí)才會(huì)將數(shù)據(jù)溢寫(xiě)到磁盤(pán),從而進(jìn)一步提高了性能。
四、Spark 程序的 Shuffle 調(diào)優(yōu)
Shuffle 階段需要將數(shù)據(jù)寫(xiě)入磁盤(pán),這涉及到大量的讀寫(xiě)文件操作和文件傳輸操作,對(duì)節(jié)點(diǎn)的系統(tǒng) I/O 有較大的影響。通過(guò)調(diào)整一些關(guān)鍵參數(shù),可以減少 Shuffle 階段的文件數(shù)量和 I/O 讀寫(xiě)次數(shù),從而提高性能。以下是幾個(gè)主要的調(diào)優(yōu)參數(shù):
1、spark.shuffle.manager:設(shè)置 Spark 任務(wù)的 ShuffleManager 模式。對(duì)于 Spark 1.2 以上版本,默認(rèn)值為 sort
,即在 Shuffle Write 階段會(huì)對(duì)數(shù)據(jù)進(jìn)行排序,每個(gè) executor 上生成的文件會(huì)合并成兩個(gè)文件(一個(gè)數(shù)據(jù)文件和一個(gè)索引文件)。通常情況下,默認(rèn)的 sort
模式已經(jīng)能夠提供較好的性能,除非有特殊情況,一般不需要更改此參數(shù)。
2、spark.shuffle.sort.bypassMergeThreshold:設(shè)置啟用 bypass 機(jī)制的閾值。如果 Shuffle Read 階段的 task 數(shù)量小于或等于該值,則 Shuffle Write 階段會(huì)啟用 bypass 機(jī)制。默認(rèn)值為 200。如果 Shuffle Read 階段的 task 數(shù)量較少,可以適當(dāng)降低這個(gè)閾值,以啟用 bypass 機(jī)制,減少文件合并操作,提高性能。
3、spark.shuffle.file.buffer:設(shè)置 Shuffle Write 階段寫(xiě)文件時(shí)緩沖區(qū)的大小。默認(rèn)值為 32MB。如果內(nèi)存資源充足,可以將該值調(diào)大(例如 64MB),以減少 executor 的 I/O 讀寫(xiě)次數(shù),提高寫(xiě)入速度。
4、spark.shuffle.io.maxRetries:設(shè)置 Shuffle Read 階段 fetch 數(shù)據(jù)時(shí)的最大重試次數(shù)。默認(rèn)值為 3 次。如果 Shuffle 階段的數(shù)據(jù)量很大,網(wǎng)絡(luò)環(huán)境不穩(wěn)定,可以適當(dāng)增加重試次數(shù),以提高數(shù)據(jù)傳輸?shù)某晒β省?/p>
除了上述參數(shù)外,還有一些其他常用的 Shuffle 調(diào)優(yōu)參數(shù),可以幫助進(jìn)一步優(yōu)化性能:
1、spark.shuffle.compress:是否啟用 Shuffle 數(shù)據(jù)的壓縮。默認(rèn)值為 true
。啟用壓縮可以減少網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量,但會(huì)增加 CPU 負(fù)載。如果網(wǎng)絡(luò)帶寬是瓶頸,建議開(kāi)啟壓縮;如果 CPU 是瓶頸,可以考慮關(guān)閉壓縮。
2、spark.shuffle.spill:是否啟用 Shuffle 數(shù)據(jù)的溢寫(xiě)(spill)。默認(rèn)值為 true
。啟用溢寫(xiě)可以防止內(nèi)存不足導(dǎo)致的任務(wù)失敗,但會(huì)增加磁盤(pán) I/O 開(kāi)銷(xiāo)。如果內(nèi)存資源充足,可以考慮關(guān)閉溢寫(xiě)以提高性能。
3、spark.shuffle.spill.compress:是否啟用 Shuffle 溢寫(xiě)數(shù)據(jù)的壓縮。默認(rèn)值為 true
。啟用壓縮可以減少磁盤(pán) I/O 開(kāi)銷(xiāo),但會(huì)增加 CPU 負(fù)載。如果磁盤(pán) I/O 是瓶頸,建議開(kāi)啟壓縮;如果 CPU 是瓶頸,可以考慮關(guān)閉壓縮。
4、spark.shuffle.memoryFraction:分配給 Shuffle 操作的內(nèi)存比例。默認(rèn)值為 0.66。根據(jù)實(shí)際內(nèi)存情況調(diào)整該值,以平衡 Shuffle 操作和其他操作的內(nèi)存需求。
5、spark.shuffle.manager.numPartitions:設(shè)置 Shuffle 操作的分區(qū)數(shù)。默認(rèn)值根據(jù)數(shù)據(jù)量自動(dòng)確定。合理設(shè)置分區(qū)數(shù),避免過(guò)多或過(guò)少的分區(qū)。過(guò)多的分區(qū)會(huì)導(dǎo)致更多的網(wǎng)絡(luò)通信,過(guò)少的分區(qū)可能導(dǎo)致數(shù)據(jù)傾斜。
通過(guò)調(diào)整這些參數(shù),可以顯著改善 Shuffle 階段的性能,從而提升整個(gè) Spark 應(yīng)用的效率。