重慶網(wǎng)站建設(shè)價格網(wǎng)壇最新排名
文章目錄
- (96) 默認HashPartitioner分區(qū)
- (97) 自定義分區(qū)案例
- (98)分區(qū)數(shù)與Reduce個數(shù)的總結(jié)
- 參考文獻
(96) 默認HashPartitioner分區(qū)
分區(qū),是Shuffle里核心的一環(huán),不同分區(qū)的數(shù)據(jù)最終會被送進不同的ReduceTask去處理。之前的幾個小節(jié)里也都講過分區(qū)。
Hadoop里默認的分區(qū)方式是HashPartitioner分區(qū),核心代碼:
public class HashPartitioner<K, V> extends Partitioner<K, V> {public int getPartition(K key, V value, iint numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}
在HashPartitioner里,每個key分到哪個ReduceTask(可以理解成Key屬于哪個分區(qū)),是根據(jù)每個key的hashCode對ReduceTask的個數(shù)取模得到的,用戶是沒法控制的。
這里是為什么還要& Integer.MAX_VALUE
呢?
主要是為了防止溢寫,通過& Integer.MAX_VALUE
,將key的hash值控制在Integer.MAX_VALUE
及之下。
從代碼里看,在往環(huán)形緩沖區(qū)寫的時候,如果識別到numReduceTasks > 1
,則啟用HashPartitioner分區(qū),如果numReduceTasks = 1
,那就不啟用了,直接return numReduceTasks - 1
。
我們也可以自定義Partitioner,自定義類需要繼承Partitioner類,并重寫里面的getPartition()方法。
public class CustomPartitioner extendsPartitioner<Text, FlowBean>{@overridepublic int getPartition(Text key, FlowBean value, int numPartitions){//控制分區(qū)代碼邏輯。。。。。。return partition;}}
然后在驅(qū)動類里,設(shè)置上寫好的自定義Partitioner:
job.setPartitionerClass(CustomPartitioner.class);
最后再設(shè)置上ReduceTask的數(shù)量:
job.setNumReduceTasks(5);
如果不設(shè)置ReduceTask的數(shù)量,那分區(qū)數(shù)默認是1,直接return 0,不會啟用自定義分區(qū)。
(97) 自定義分區(qū)案例
首先拋出一個需求:將一堆手機號按照歸屬地的省份輸出到不同的文件里。
已有一個phone_data.txt文件。
所以期望的輸出數(shù)據(jù)是什么樣子的呢?
手機號136/137/138/139開頭的分別放進4個獨立的文件里,然后其他的手機號放到一個文件里。最終形成5個文件。
顯而易見,這個需求的核心在于自定義分區(qū)上。
所以我們需要寫一個自定義分區(qū)類,假設(shè)它叫ProvincePartitioner
,我們希望它能做到以下分配:
136 分區(qū)0
137 分區(qū)1
138 分區(qū)2
139 分區(qū)3
其他 分區(qū)4
等分區(qū)類建好后,別忘記在驅(qū)動里注冊上這個類,并定義好ReduceTask數(shù)量。
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
展示一下ProvincePartitioner
類的代碼:
package com.atguigu.mapreduce.partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner<Text, FlowBean> {@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {//獲取手機號前三位prePhoneString phone = text.toString();String prePhone = phone.substring(0, 3);//定義一個分區(qū)號變量partition,根據(jù)prePhone設(shè)置分區(qū)號int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分區(qū)號partitionreturn partition;}
}
(98)分區(qū)數(shù)與Reduce個數(shù)的總結(jié)
思考這么一個問題,如果自定義Partitioner
中定義了5個分區(qū),但是驅(qū)動類里注冊的時候,只聲明了4個分區(qū),即job.setNumReduceTask=4
,那這時候代碼會正常運行么?
不會,會報java.io.IOException。
至于為什么報IO異常,自然是MapTask中,在往環(huán)形緩沖器Collector里寫的時候,發(fā)現(xiàn)沒有第5個分區(qū),寫不進去當然就報IO異常。
但是,設(shè)置job.setNumReduceTask=1
,代碼是可以跑的,這是為什么呢?
原因其實之前提過,這是因為設(shè)置為1后,MapTask里,Collector在collect數(shù)據(jù)的時候,分區(qū)就不走我們自定義的Partitioner,而是直接return 0了,到最后Reduce階段也只會生成一個文件。
這里是有點反直覺的,需要注意。
那我如果job.setNumReduceTask=6
呢,代碼還能跑嗎?
可以跑,且會生成6個文件,只不過第6個文件是空的。
總結(jié)一下:
- 當NumReduceTask > getPartition()里定義的分區(qū)數(shù)量,可以正常運行,但是相應的,會多余生成一些空的文件,浪費計算資源和存儲資源;
- 當 1 < NumReduceTask < getPartition()分區(qū)量,會報IO異常,因為少的那一部分分區(qū)的數(shù)據(jù)會無法寫入;
- 當NumReduceTask = 1時,不會調(diào)用自定義分區(qū)器,而是會將所有的數(shù)據(jù)都交付給一個ReduceTask,最后也只會生成一個文件。
- 自定義分區(qū)類時,分區(qū)號必須從0開始,且必須是連續(xù)的,即是逐一累加的。
最后一條比較重要,即必須是0/1/2/3/4/5/…這種形式,而不能是0/10/11/20這種。
2023-7-24 17:08:08 我有個小問題,就是驅(qū)動類里設(shè)置setNumReduceTask的時候,能不能設(shè)置成動態(tài)的,就是根據(jù)輸入數(shù)據(jù)調(diào)整的呢?
查了一下,確實是有這種取巧的方式,比如說使用自定義的InputFormat,在讀取數(shù)據(jù)的同時,獲取數(shù)據(jù)量的情況,并根據(jù)這些信息動態(tài)調(diào)整ReduceTask的數(shù)量。這里就不多講了,有興趣可以查查。
參考文獻
- 【尚硅谷大數(shù)據(jù)Hadoop教程,hadoop3.x搭建到集群調(diào)優(yōu),百萬播放】