免費網(wǎng)站的手機版本源碼模板廣告軟文案例
文章目錄
- (99)WritableComparable排序
- 什么是排序
- 什么時候需要排序
- 排序有哪些分類
- 如何實現(xiàn)自定義排序
- (100)全排序案例
- 案例需求
- 思路分析
- 實際代碼
- (101)二次排序案例
- (102) 區(qū)內(nèi)排序案例
- 參考文獻
(99)WritableComparable排序
什么是排序
排序是MR中最重要的操作之一,也是面試中可能被問到的重點。
MapTask和ReduceTask中都會對數(shù)據(jù)按照KEY來排序,主要是為了效率,排完序之后,相同key值的數(shù)據(jù)會被放在一起,更方便下一步(如Reducer())的匯總處理。
默認排序是按照字典順序(字母由小到大,或者是數(shù)字由小到大)排序,且實現(xiàn)該排序的方法是快速排序。
什么時候需要排序
MR的過程中,什么時候用到了排序呢?
Map階段:
- 環(huán)形緩沖區(qū)溢寫到磁盤之前,會將每個分區(qū)內(nèi)數(shù)據(jù)分別進行一個快排,這個排序是在內(nèi)存中完成的;(對key的索引,按照字典順序排列)
- 環(huán)形緩沖區(qū)多輪溢寫完畢后,會形成一堆文件,這時候會對這些文件做merge歸并排序,我理解是單個MapTask最終會匯總形成一個文件;
Reduce階段:
- ReduceTask會主動拉取MapTask們的輸出文件,理論上是會優(yōu)先保存到內(nèi)存里,但是往往內(nèi)存里放不下,所以多數(shù)情況下會直接溢寫到磁盤,于是我們會得到多個文件。當(dāng)文件數(shù)量超過閾值,之后需要做歸并排序,合并成一個大文件。如果是內(nèi)存中的數(shù)據(jù)超過閾值,則會進行一次合并后將數(shù)據(jù)溢寫到磁盤。當(dāng)所有數(shù)據(jù)拷貝完后,ReduceTask會統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進行一次歸并排序。
- 文件合并后其實還可以進行一個分組排序,過于復(fù)雜,這里就不介紹了。
排序有哪些分類
MR里的排序還有部分排序、全排序、輔助排序、二次排序的不同說法,注意,它們之間不是像那種傳統(tǒng)的排序算法之間的區(qū)別,只是當(dāng)排序在不同場景的時候,分別起了個名字。
MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序,保證輸出的每個文件內(nèi)部是有序的,這就是部分排序。
最終輸出結(jié)果只有一個文件,且文件內(nèi)部有序。這就是全排序。
全排序的實現(xiàn)方式是只設(shè)置一個ReduceTask。但是這種方式在處理大型文件時效率很低很低,因為一臺機器處理全部數(shù)據(jù),完全沒有利用MR所提供的并行架構(gòu)的優(yōu)勢,生產(chǎn)環(huán)境上完全不適用。
所以生產(chǎn)環(huán)境里,常用的還是部分排序。
輔助排序,就是GroupingComparator分組。
這個似乎是可選的,是在Reduce階段,Reducer在從Map階段主動拉取完數(shù)據(jù)后,會對所有文件做一次歸并排序。做完歸并排序之后,理論上就可以進行輔助排序。
輔助排序有啥用呢,就是當(dāng)接收到的Key是個bean對象時,輔助排序可以讓一個或者幾個字段相同的key(全部字段不相同)進入同一個Reduce(),所以也起名叫做分組排序。
二次排序比較簡單,在自定義排序過程中,如果compareTo中的判斷條件為兩個,那它就是二次排序。
如何實現(xiàn)自定義排序
說到這里,那 如何實現(xiàn)自定義排序 呢?
如果是bean對象作為key傳輸,那需要實現(xiàn)WritableComparable接口,重寫compareTo方法,就可以實現(xiàn)自定義排序。
@Override
public int compareTo(FlowBean bean) {int result;// 按照總流量大小,倒序排列if (this.sumFlow > bean.getSumFlow()) {result = -1;}else if (this.sumFlow < bean.getSumFlow()) {result = 1;}else {result = 0;}return result;
}
(100)全排序案例
案例需求
之前我們做過一個案例,輸入文件有一個,里面放的是每個手機號的上行流量和下行流量,輸出同樣是一個文件,里面放的除了手機號的上行流量和下行流量之外,還多了一行總流量。
這時候我們提一個新需求,就是我不止要這個輸出文件,我還要這個文件里的內(nèi)容,按照總流量降序排列。
思路分析
MapReduce里,只能對Key進行排序。在先前的需求里,我們是用手機號作為key,上行流量、下行流量和總流量組成一個bean,作為value,這樣的安排顯然不適合新需求。
因此我們需要改變一下,將上行流量、下行流量和總流量組成的bean作為key,而將手機號作為value,如此來排序。
所以第一步,我們需要對我們自定義的FlowBean對象聲明WritableComparable接口,并重寫CompareTo方法,這一步的目的是使得FlowBean可進行算數(shù)比較,從而允許排序:
@Override
public int CompareTo(FlowBean o){// 按照總流量,降序排列return this.sumFlow > o.getSumFlow()?-1:1;
}
注意這里,因為Hadoop里默認的字典排序是從小到大排序,如果想實現(xiàn)案例里由大到小的排序,那么當(dāng)大于的時候,就要返回-1,從而將大的值排在前面。
其次,Mapper類里:
context.write(bean, 手機號)
bean成了key,手機號成了value。
最后,Reduce類里,需要循環(huán)輸出,避免出現(xiàn)總流量相同的情況。
for (Text text: values){context.write(text, key); // 注意順序,原先的key放在value位置
}
2023-7-19 11:16:04 這里沒懂。。。
哦哦明白了,什么樣的數(shù)據(jù)會進一個Reducer呢,當(dāng)然是key 值相同的會進同一個,又因為我們之前compareTo
的時候用的是總流量,所以最后是總流量相同的記錄會送進同一個Reducer,然后匯總成一條記錄做輸出,畢竟reducer就是用來做匯總的。
但"匯總成一條記錄"這并不是我們想要的,我們需要的是把這些數(shù)據(jù)原模原樣輸出來。這就是為什么我們在Reducer的reduce()里面,要加上循環(huán)輸出的原因。
實際代碼
貼一下教程里的代碼實現(xiàn):
首先是FlowBean對象,需要聲明WritableComparable
接口,并重寫CompareTo()
package com.atguigu.mapreduce.writablecompable;import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements WritableComparable<FlowBean> {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //總流量//提供無參構(gòu)造public FlowBean() {}//生成三個屬性的getter和setter方法public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//實現(xiàn)序列化和反序列化方法,注意順序一定要一致@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.upFlow);out.writeLong(this.downFlow);out.writeLong(this.sumFlow);}@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readLong();this.downFlow = in.readLong();this.sumFlow = in.readLong();}//重寫ToString,最后要輸出FlowBean@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {//按照總流量比較,倒序排列if(this.sumFlow > o.sumFlow){return -1;}else if(this.sumFlow < o.sumFlow){return 1;}else {return 0;}}
}
然后編寫Mapper類:
package com.atguigu.mapreduce.writablecompable;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {private FlowBean outK = new FlowBean();private Text outV = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1 獲取一行數(shù)據(jù)String line = value.toString();//2 按照"\t",切割數(shù)據(jù)String[] split = line.split("\t");//3 封裝outK outVoutK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();outV.set(split[0]);//4 寫出outK outVcontext.write(outK,outV);}
}
然后編寫Reducer類:
package com.atguigu.mapreduce.writablecompable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//遍歷values集合,循環(huán)寫出,避免總流量相同的情況for (Text value : values) {//調(diào)換KV位置,反向?qū)懗?/span>context.write(value,key);}}
}
最后編寫驅(qū)動類:
package com.atguigu.mapreduce.writablecompable;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1 獲取job對象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2 關(guān)聯(lián)本Driver類job.setJarByClass(FlowDriver.class);//3 關(guān)聯(lián)Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);//4 設(shè)置Map端輸出數(shù)據(jù)的KV類型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//5 設(shè)置程序最終輸出的KV類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6 設(shè)置輸入輸出路徑FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));//7 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
完成,僅做了解即可。
(101)二次排序案例
二次排序的概念很簡單,其實之前提過了,就是在自定義排序的時候,判斷條件有兩個。
比如說,原先我對一堆人排序,是按照身高從高到低排,但是身高一樣的就沒法排序了,這時候我可以再加入一個判斷條件,比如說如果身高一樣的話,就按體重排序。
具體就是修改FlowBean的CompareTo方法,在第一條件相等的時候,添加第二判定條件。
public int compareTo(FlowBean o) {//按照總流量比較,倒序排列if(this.sumFlow > o.sumFlow){return -1;}else if(this.sumFlow < o.sumFlow){return 1;}else {if (this.upFlow > o.upFlow){return 1;} else if (this.upFlow < o.upFlow){return -1;}else {return 0;}}
}
如果有需要的話,還可以繼續(xù)加第三判定條件。
(102) 區(qū)內(nèi)排序案例
還是之前的手機號案例,之前我們想要的是,只有一個文件,然后文件內(nèi)所有數(shù)據(jù)按照總流量降序排列。
現(xiàn)在我們提出一個新要求,按照前3位來分區(qū)輸出,比如說136的在一個文件里,137的在一個文件里,以此類推。而且每個文件內(nèi)部,還需要按照總流量降序排列。
本質(zhì)上就是之前說的分區(qū) + 排序,這兩部分的結(jié)合。需要額外定義好Partitioner類。
貼一下教程里的代碼示例,其實只需要在上一小節(jié)的基礎(chǔ)上補充自定義分區(qū)類即可:
首先自定義好分區(qū)類:
package com.atguigu.mapreduce.partitionercompable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) {//獲取手機號前三位String phone = text.toString();String prePhone = phone.substring(0, 3);//定義一個分區(qū)號變量partition,根據(jù)prePhone設(shè)置分區(qū)號int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分區(qū)號partitionreturn partition;}
}
然后在驅(qū)動類里注冊好分區(qū)器:
// 設(shè)置自定義分區(qū)器
job.setPartitionerClass(ProvincePartitioner2.class);// 設(shè)置對應(yīng)的ReduceTask的個數(shù)
job.setNumReduceTasks(5);
其他跟上一小節(jié)保持一致即可。
參考文獻
- 【尚硅谷大數(shù)據(jù)Hadoop教程,hadoop3.x搭建到集群調(diào)優(yōu),百萬播放】