順德網(wǎng)站建設(shè)策劃做網(wǎng)站多少錢
Hive數(shù)據(jù)傾斜以及解決方案
1、什么是數(shù)據(jù)傾斜
數(shù)據(jù)傾斜主要表現(xiàn)在,map/reduce程序執(zhí)行時(shí),reduce節(jié)點(diǎn)大部分執(zhí)行完畢,但是有一個(gè)或者幾個(gè)reduce節(jié)點(diǎn)運(yùn)行很慢,導(dǎo)致整個(gè)程序的處理時(shí)間很長,這是因?yàn)槟骋粋€(gè)key的條數(shù)比其他key多很多(有時(shí)是百倍或者千倍之多),這條Key所在的reduce節(jié)點(diǎn)所處理的數(shù)據(jù)量比其他節(jié)點(diǎn)就大很多,從而導(dǎo)致某幾個(gè)節(jié)點(diǎn)遲遲運(yùn)行不完。
2、數(shù)據(jù)傾斜的原因
一些操作有關(guān):
關(guān)鍵詞 | 情形 | 后果 |
---|---|---|
Join | 其中一個(gè)表較小,但是key集中 | 分發(fā)到某一個(gè)或幾個(gè)Reduce上的數(shù)據(jù)遠(yuǎn)高于平均值 |
大表與大表,但是分桶的判斷字段0值或空值過多 | 這些空值都由一個(gè)reduce處理,非常慢 | |
group by | group by 維度過小,某值的數(shù)量過多 | 處理某值的reduce灰常耗時(shí) |
Count Distinct | 某特殊值過多 | 處理此特殊值的reduce耗時(shí) |
原因歸納:
- key分布不均勻
- 業(yè)務(wù)數(shù)據(jù)本身的特性
- 建表時(shí)考慮不周
- 某些SQL語句本身就有數(shù)據(jù)傾斜
現(xiàn)象:
任務(wù)進(jìn)度長時(shí)間維持在99%(或100%),查看任務(wù)監(jiān)控頁面,發(fā)現(xiàn)只有少量(1個(gè)或幾個(gè))reduce子任務(wù)未完成。因?yàn)槠涮幚淼臄?shù)據(jù)量和其他reduce差異過大。單一reduce的記錄數(shù)與平均記錄數(shù)差異過大,通常可能達(dá)到3倍甚至更多。 最長時(shí)長遠(yuǎn)大于平均時(shí)長。
3、數(shù)據(jù)傾斜的解決方案
1)參數(shù)調(diào)節(jié)
Map 端部分聚合,相當(dāng)于Combiner
hive.map.aggr = true
有數(shù)據(jù)傾斜的時(shí)候進(jìn)行負(fù)載均衡,當(dāng)選項(xiàng)設(shè)定為true,生成的查詢計(jì)劃會(huì)有兩個(gè)MR Job。第一個(gè)MR Job中,Map 的輸出結(jié)果集合會(huì)隨機(jī)分布到Reduce中,每個(gè)Reduce做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的Group By Key有可能被分發(fā)到不同的Reduce中,從而達(dá)到負(fù)載均衡的目的;第二個(gè)MRJob再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照Group By Key分布到Reduce中(這個(gè)過程可以保證相同的Group By Key被分布到同一個(gè)Reduce中),最后完成最終的聚合操作。
hive.groupby.skewindata=true
2)SQL語句調(diào)節(jié)
如何join:
關(guān)于驅(qū)動(dòng)表的選取,選用join key分布最均勻的表作為驅(qū)動(dòng)表,做好列裁剪和filter操作,以達(dá)到兩表做join的時(shí)候,數(shù)據(jù)量相對(duì)變小的效果。
大小表Join:
使用map join讓小的維度表(1000條以下的記錄條數(shù))先進(jìn)內(nèi)存。在map端完成reduce。
大表Join大表:
把空值的key變成一個(gè)字符串加上隨機(jī)數(shù),把傾斜的數(shù)據(jù)分到不同的reduce上,由于null值關(guān)聯(lián)不上,處理后并不影響最終結(jié)果。
count distinct大量相同特殊值:
count distinct時(shí),將值為空的情況單獨(dú)處理,如果是計(jì)算count distinct,可以不用處理,直接過濾,在最后結(jié)果中加1。如果還有其他計(jì)算,需要進(jìn)行g(shù)roup by,可以先將值為空的記錄單獨(dú)處理,再和其他計(jì)算結(jié)果進(jìn)行union。
group by維度過小:
采用sum() group by的方式來替換count(distinct)完成計(jì)算。
特殊情況特殊處理:
在業(yè)務(wù)邏輯優(yōu)化效果的不大情況下,有些時(shí)候是可以將傾斜的數(shù)據(jù)單獨(dú)拿出來處理。最后union回去。
4、典型的業(yè)務(wù)場(chǎng)景
1)空值產(chǎn)生的數(shù)據(jù)傾斜
場(chǎng)景:如日志中,常會(huì)有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關(guān)聯(lián),會(huì)碰到數(shù)據(jù)傾斜的問題。
解決方法一:user_id為空的不參與關(guān)聯(lián)
select * from log a
join users b
on a.user_id is not null
and a.user_id = b.user_id
union all
select * from log a
where a.user_id is null;
解決方法二:賦與空值分新的key值
select *
from log a
left outer join users b
on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end =
b.user_id;
結(jié)論:
方法2比方法1效率更好,不但io少了,而且作業(yè)數(shù)也少了。解決方法一中l(wèi)og讀取兩次,jobs是2。解決方法二job數(shù)是1。這個(gè)優(yōu)化適合無效id (比如 -99 , ’’, null 等) 產(chǎn)生的傾斜問題。把空值的key變成一個(gè)字符串加上隨機(jī)數(shù),就能把傾斜的數(shù)據(jù)分到不同的reduce上,解決數(shù)據(jù)傾斜問題。
2)不同數(shù)據(jù)類型關(guān)聯(lián)產(chǎn)生數(shù)據(jù)傾斜
場(chǎng)景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當(dāng)按照user_id進(jìn)行兩個(gè)表的Join操作時(shí),默認(rèn)的Hash操作會(huì)按int型的id來進(jìn)行分配,這樣會(huì)導(dǎo)致所有string類型id的記錄都分配到一個(gè)Reducer中。
解決方法:把數(shù)字類型轉(zhuǎn)換成字符串類型
select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string);
3)小表不小不大,怎么用 map join 解決傾斜問題
使用map join解決小表(記錄數(shù)少)關(guān)聯(lián)大表的數(shù)據(jù)傾斜問題,這個(gè)方法使用的頻率非常高,但如果小表很大,大到map join會(huì)出現(xiàn)bug或異常,這時(shí)就需要特別的處理。
例子:
select * from log a
left outer join users b
on a.user_id = b.user_id;
users表有600w+的記錄,把users分發(fā)到所有的map上也是個(gè)不小的開銷,而且map join不支持這么大的小表。如果用普通的join,又會(huì)碰到數(shù)據(jù)傾斜的問題。
解決方法:
select /*+mapjoin(x)*/* from log aleft outer join (select /*+mapjoin(c)*/d.*from ( select distinct user_id from log ) cjoin users don c.user_id = d.user_id) x
on a.user_id = b.user_id;
假如,log里user_id有上百萬個(gè),這就又回到原來map join問題。所幸,每日的會(huì)員uv不會(huì)太多,有交易的會(huì)員不會(huì)太多,有點(diǎn)擊的會(huì)員不會(huì)太多,有傭金的會(huì)員不會(huì)太多等等。所以這個(gè)方法能解決很多場(chǎng)景下的數(shù)據(jù)傾斜問題。
Hive數(shù)據(jù)同步到HDFS,小文件問題怎么解決的?
我們先考慮小文件的產(chǎn)生和影響:
1)哪里會(huì)產(chǎn)生小文件
- 源數(shù)據(jù)本身有很多小文件
- 動(dòng)態(tài)分區(qū)會(huì)產(chǎn)生大量小文件
- reduce個(gè)數(shù)越多, 小文件越多
- 按分區(qū)插入數(shù)據(jù)的時(shí)候會(huì)產(chǎn)生大量的小文件, 文件個(gè)數(shù) = maptask個(gè)數(shù) * 分區(qū)數(shù)
2)小文件太多造成的影響
- 從Hive的角度看,小文件會(huì)開很多map,一個(gè)map開一個(gè)JVM去執(zhí)行,所以這些任務(wù)的初始化,啟動(dòng),執(zhí)行會(huì)浪費(fèi)大量的資源,嚴(yán)重影響性能。
- HDFS存儲(chǔ)太多小文件, 會(huì)導(dǎo)致namenode元數(shù)據(jù)特別大, 占用太多內(nèi)存, 制約了集群的擴(kuò)展
小文件解決方案:
方法一:通過調(diào)整參數(shù)進(jìn)行合并
1)在Map輸入的時(shí)候, 把小文件合并
-- 老版本 每個(gè)Map最大輸入大小,決定合并后的文件數(shù)
set mapred.max.split.size=256000000;
-- hadoop2.x 每個(gè)Map最大輸入大小,決定合并后的文件數(shù)
set mapreduce.input.fileinputformat.split.maxsize=2048000000;(2G)
-- 一個(gè)節(jié)點(diǎn)上split的至少的大小 ,決定了多個(gè)data node上的文件是否需要合并
set mapred.min.split.size.per.node=100000000;
-- 一個(gè)交換機(jī)下split的至少的大小,決定了多個(gè)交換機(jī)上的文件是否需要合并
set mapred.min.split.size.per.rack=100000000;
-- 執(zhí)行Map前進(jìn)行小文件合并開關(guān)開啟:true
set hive.hadoop.supports.splittable.combineinputformat = true;
-- 執(zhí)行Map前進(jìn)行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
2)在Reduce輸出的時(shí)候, 把小文件合并
-- (和下面的2選1)在map-only job后合并文件,默認(rèn)true
set hive.merge.mapfiles = true;
-- (和下面的2選1)在map-reduce job后合并文件,默認(rèn)false
set hive.merge.mapredfiles = true;
-- 合并后每個(gè)文件的大小,默認(rèn)256000000
set hive.merge.size.per.task = 256000000;
-- 平均文件大小,是決定是否執(zhí)行合并操作的閾值,默認(rèn)16000000
set hive.merge.smallfiles.avgsize = 100000000;
方法二:(Hive sql 執(zhí)行完創(chuàng)建小文件數(shù)過多)針對(duì)按分區(qū)插入數(shù)據(jù)的時(shí)候產(chǎn)生大量的小文件的問題,可以使用DISTRIBUTE BY rand() 將數(shù)據(jù)隨機(jī)分配給Reduce,這樣可以使得每個(gè)Reduce處理的數(shù)據(jù)大體一致。
-- 設(shè)置每個(gè)reducer處理的大小為5個(gè)G
set hive.exec.reducers.bytes.per.reducer=5120000000;
-- 使用distribute by rand()將數(shù)據(jù)隨機(jī)分配給reduce, 避免出現(xiàn)有的文件特別大, 有的文件特別小
insert overwrite table test partition(dt)
select * from iteblog_tmp
DISTRIBUTE BY rand();
方法三:使用Sequencefile作為表存儲(chǔ)格式,不要用textfile,在一定程度上可以減少小文件
方法四:使用hadoop的archive歸檔
-- 用來控制歸檔是否可用
set hive.archive.enabled=true;
-- 通知Hive在創(chuàng)建歸檔時(shí)是否可以設(shè)置父目錄
set hive.archive.har.parentdir.settable=true;
-- 控制需要?dú)w檔文件的大小
set har.partfile.size=1099511627776;
-- 使用以下命令進(jìn)行歸檔
ALTER TABLE srcpart ARCHIVE PARTITION(ds='2008-04-08', hr='12');
-- 對(duì)已歸檔的分區(qū)恢復(fù)為原文件
ALTER TABLE srcpart UNARCHIVE PARTITION(ds='2008-04-08', hr='12');
-- 注意,歸檔的分區(qū)不能夠INSERT OVERWRITE,必須先unarchive
Hadoop自帶的三種小文件處理方案
-
Hadoop Archive
Hadoop Archive或者HAR,是一個(gè)高效地將小文件放入HDFS塊中的文件存檔工具,它能夠?qū)⒍鄠€(gè)小文件打包成一個(gè)HAR文件,這樣在減少namenode內(nèi)存使用的同時(shí),仍然允許對(duì)文件進(jìn)行透明的訪問。 -
Sequence file
sequence file由一系列的二進(jìn)制key/value組成,如果為key小文件名,value為文件內(nèi)容,則可以將大批小文件合并成一個(gè)大文件。 -
CombineFileInputFormat
它是一種新的inputformat,用于將多個(gè)文件合并成一個(gè)單獨(dú)的split,另外,它會(huì)考慮數(shù)據(jù)的存儲(chǔ)位置。
hive 壓縮
壓縮Map的輸出,這樣做有兩個(gè)好處
- a)壓縮是在內(nèi)存中進(jìn)行,所以寫入map本地磁盤的數(shù)據(jù)就會(huì)變小,大大減少了本地IO次數(shù)
- b) Reduce從每個(gè)map節(jié)點(diǎn)copy數(shù)據(jù),也會(huì)明顯降低網(wǎng)絡(luò)傳輸?shù)臅r(shí)間
# 打開job最終輸出壓縮的開關(guān),設(shè)置之后必須設(shè)置下面這行,否則還是沒有壓縮效果
set hive.exec.compress.output=true;
# 設(shè)置壓縮類型
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
# 大文件壓縮仍然會(huì)耗時(shí),而且影響mapper并行(mapper并行和文件的個(gè)數(shù)有關(guān)),
# 這個(gè)設(shè)置,使大的文件可以分割成小文件進(jìn)行壓縮
set mapred.output.compression.type=BLOCK;
?
# 中間數(shù)據(jù)map壓縮,不影響最終結(jié)果。但是job中間數(shù)據(jù)輸出要寫在硬盤并通過網(wǎng)絡(luò)傳輸?shù)絩educe,
# 傳送數(shù)據(jù)量變小,因?yàn)閟huffle sort(混洗排序)數(shù)據(jù)被壓縮了。
set hive.exec.compress.intermediate=true;
# 為中間數(shù)據(jù)配置壓鎖編解碼器 ,通常配置Snappy更好。
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
合并分區(qū)小文件操作思路
1.創(chuàng)建備份表
create test.tb_name_bak like test.tb_name;
2.設(shè)置合并的相關(guān)參數(shù)并將原表的數(shù)據(jù)插入備份表中
#任務(wù)結(jié)束時(shí)合并小文件
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
#合并文件大小256M
SET hive.merge.size.per.task = 256000000;
#當(dāng)輸出文件平均大小小于該值時(shí),啟用文件合并
SET hive.merge.smallfiles.avgsize = 134217728;
#壓縮輸出
SET hive.exec.compress.output = true;
#壓縮方式:snappy
SET parquet.compression = snappy;
#默認(rèn)值為srticat,nonstrict模式表示允許所有分區(qū)字段都可以使用動(dòng)態(tài)分區(qū)
SET hive.exec.dynamic.partition.mode = nonstrict;
#使用動(dòng)態(tài)分區(qū)
SET hive.exec.dynamic.partition = true;
#在所有執(zhí)行MR的節(jié)點(diǎn)上,共可以創(chuàng)建多少個(gè)動(dòng)態(tài)分區(qū)
SET hive.exec.max.dynamic.partitions=3000;
#在執(zhí)行MR的單節(jié)點(diǎn)上,最大可以創(chuàng)建多少個(gè)分區(qū)
SET hive.exec.max.dynamic.partitions.pernode=500;
insert overwrite table test.tb_name_bak partition(date_str) select * from test.tb_name;
3.檢查備份表和原表數(shù)據(jù)是否一致,若一致,則進(jìn)行下一步操作
SELECT count(*) FROM test.tb_name;
SELECT count(*) FROM test.tb_name_bak;
數(shù)據(jù)轉(zhuǎn)換操作有兩種方式:
1.操作表的方式
- 刪除原表
drop table test.tb_name;
- 將備份表重命名為原表
alter table test.tb_name_bak rename to test.tb_name;
2.操作hdfs文件方式
- 將原表hdfs路徑下的文件移到某個(gè)備份目錄,確認(rèn)無誤之后,再將文件移動(dòng)回收站
- 將備份表hdfs路徑下的文件移到原表目錄下