旅游平臺網(wǎng)站合作建設(shè)方案線上運營推廣
Spark 磁盤作用
- 磁盤作用
- 性能價值
- 失敗重試
- ReuseExchange
Spark 導(dǎo)航
磁盤作用
臨時文件、中間文件、緩存數(shù)據(jù),都會存儲到 spark.local.dir
中
- 在 Shuffle Map 時, 當(dāng)內(nèi)存空間不足,就會溢出臨時文件存儲到磁盤上
- 溢出的臨時文件一起做歸并計算,得到 Shuffle 中間文件存儲到磁盤上
- 緩存分布式數(shù)據(jù)集 : DISK 的存儲模式,會把內(nèi)存中放不下的數(shù)據(jù)緩存到磁盤
性能價值
spark.local.dir
配置到 SDD 或訪問高效的存儲系統(tǒng)
磁盤復(fù)用 :
- 給執(zhí)行性能帶來更好的提升
- 磁盤復(fù)用 : Shuffle Write 產(chǎn)生的中間文件被多次利用
失敗重試
一旦某個計算環(huán)節(jié)出錯,就會觸發(fā)失敗重試。失敗重試的觸發(fā)點是距離最新的 Shuffle 的中間文件
當(dāng) RDD4 的計算任務(wù)失敗時,會從 RDD4 向前回溯,回溯到 RDD3 (RDD2 輸出的中間文件 ) ,并重新開始計算
ReuseExchange
ReuseExchange 是 Spark SQL 優(yōu)化一種 : 相同或相似的物理計劃能共享 Shuffle 中間文件
ReuseExchange 機制的觸發(fā)條件:
- 多個查詢所依賴的分區(qū)規(guī)則要與 Shuffle 中間數(shù)據(jù)的分區(qū)規(guī)則保持一致
- 多個查詢所涉及的字段(Attributes)要保持一致
統(tǒng)計不同用戶的 PV(Page Views,頁面瀏覽量)、UV(Unique Views,網(wǎng)站獨立訪客),并把兩項統(tǒng)計結(jié)果合并:
//版本1:分別計算PV、UV,然后合并
// Data schema (userId: String, accessTime: Timestamp, page: String)
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)val dfPV: DataFrame = df.groupBy("userId").agg(count("page").alias("value"))
val dfUV: DataFrame = df.groupBy("userId").agg(countDistinct("page").alias("value"))val resultDF: DataFrame = dfPV.Union(dfUV)
// Result樣例
| userId | metrics | value |
| user0 | PV | 25 |
| user0 | UV | 12 |
文件掃描/Shuffle 兩次 :
以 userId 為分區(qū) ,調(diào)用 repartition :
//版本2:分別計算PV、UV,然后合并
// Data schema (userId: String, accessTime: Timestamp, page: String)
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath).repartition($"userId")val dfPV: DataFrame = df.groupBy("userId").agg(count("page").alias("value"))
val dfUV: DataFrame = df.groupBy("userId").agg(countDistinct("page").alias("value"))val resultDF: DataFrame = dfPV.Union(dfUV)
// Result樣例
| userId | metrics | value |
| user0 | PV | 25 |
| user0 | UV | 12 |
ReuseExchange :
- 數(shù)據(jù)源只需掃描一遍
- Shuffle 也只發(fā)生一次