常見(jiàn)的企業(yè)網(wǎng)站有哪些百度店鋪怎么開(kāi)通
MapReduce概述
是一個(gè)分布式的編程框架,MapReduce核心功能是將用戶編寫(xiě)的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)Hadoop集群上。
- 優(yōu)點(diǎn):
- 易于編程,簡(jiǎn)單的實(shí)現(xiàn)一些接口,就可以完成一個(gè)分布式程序。
- 良好的擴(kuò)展性,可以增加機(jī)器來(lái)擴(kuò)展計(jì)算能力。
- 高容錯(cuò)性,子任務(wù)失敗后可以重試4次。
- 缺點(diǎn):
- 不擅長(zhǎng)實(shí)時(shí)計(jì)算
- 不能進(jìn)行流式計(jì)算,MapReduce的輸入數(shù)據(jù)集是靜態(tài)的,不能動(dòng)態(tài)變化。
- 不擅長(zhǎng)DAG有向無(wú)環(huán)圖:下一段計(jì)算的起始數(shù)據(jù)取決于上一個(gè)階段的結(jié)果。
MapReduce核心思想
- Map: 讀單詞,進(jìn)行分區(qū)
- Shuffle:排序是框架內(nèi)固定的代碼,必須排序。進(jìn)行快排
- Reduce:對(duì)區(qū)間有序的內(nèi)容進(jìn)行歸并排序,累加單詞
- 在MapReduce過(guò)程中只能有一個(gè)Map和一個(gè)Reduce
- MapReduce進(jìn)程
- MrAppMaster:負(fù)責(zé)整個(gè)程序的過(guò)程調(diào)度及狀態(tài)協(xié)調(diào)。
- MapTask:負(fù)責(zé)Map階段的整個(gè)數(shù)據(jù)處理流程。
- ReduceTask:負(fù)責(zé)Reduce階段的整個(gè)數(shù)據(jù)處理流程。
- 序列化
- 變量類(lèi)型后面加上Writable,轉(zhuǎn)換成可以序列化的類(lèi)型
- String類(lèi)型有點(diǎn)特別,相應(yīng)的序列化類(lèi)型為T(mén)ext
- java類(lèi)型轉(zhuǎn)hadoop類(lèi)型
private IntWritable key = new IntWritable();
key.set(java_value);
- 構(gòu)造器轉(zhuǎn)換
new intWritable(1);
- hadoop類(lèi)型轉(zhuǎn)java類(lèi)型
int value = key.get();
WordCount案例
Driver類(lèi)的8個(gè)步驟
該類(lèi)中的步驟是使用hadoop框架的核心,這8個(gè)步驟是寫(xiě)死的,無(wú)法更改,具體為:
- 獲取配置信息,獲取job對(duì)象實(shí)例
- 指定本程序的jar包所在的本地路徑
- 關(guān)聯(lián)Mapper/Reducer業(yè)務(wù)類(lèi)
- 指定Mapper輸出數(shù)據(jù)的kv類(lèi)型
- 指定最終輸出數(shù)據(jù)的kv類(lèi)型,部分案例不需要reduce這個(gè)步驟
- 指定job的輸入原始文件所在目錄
- 指定job的輸出結(jié)果所在目錄
- 提交作業(yè)
WordCountMapper類(lèi)的實(shí)現(xiàn)
- 繼承mapper類(lèi),選擇mapreduce包,新版本,老版本的叫mapred包
- 根據(jù)業(yè)務(wù)需求設(shè)定泛型的具體類(lèi)型,輸入的kv類(lèi)型,輸出的kv類(lèi)型。
- 輸入類(lèi)型
- keyIn:起始偏移量,是字節(jié)偏移量。一般不參與計(jì)算,類(lèi)型為longwritable
- valueIn:每一行數(shù)據(jù),類(lèi)型為T(mén)ext
- 輸出類(lèi)型
- keyOut: 每個(gè)單詞
- valueOut:數(shù)字1
- 輸入類(lèi)型
- 重寫(xiě)map方法,ctrl + o 快捷鍵重寫(xiě)方法
@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {//1.獲取一行// atguigu atguiguString line = value.toString();//2.切割//atguigu//atguiguString[] words = line.split(" ");//3.循環(huán)寫(xiě)出for (String word : words) {//封裝outKeycontext.write(new Text(word),new IntWritable(1));}}
WordCountReducer類(lèi)的實(shí)現(xiàn)
- 繼承Reduce類(lèi)
- 設(shè)置輸入輸出類(lèi)型
- 輸入: 跟Map的輸出類(lèi)型對(duì)應(yīng)即可
- 輸出:
- keyInt, 單詞,類(lèi)型為T(mén)ext
- keyOut, 次數(shù),類(lèi)型為IntWritable
- 重寫(xiě)reduce方法
@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum = 0;//atguigu(1,1)//累加for (IntWritable value : values) {sum += value.get();}//寫(xiě)出context.write(key, new IntWritable(sum);//注意}
在處理大數(shù)據(jù)時(shí),不要在循環(huán)中new對(duì)象,創(chuàng)建對(duì)象是很消耗資源的??梢允褂?code>ctrl + alt + F將這兩個(gè)變量提升為全局變量,作為Reducer類(lèi)的屬性值。但其實(shí)還有更好的方法,可以使用Mapper類(lèi)中的setup()方法來(lái)實(shí)現(xiàn)該需求。該方法是框架里面原本就設(shè)定好的方法,在map階段前只會(huì)執(zhí)行1次。
private Text outKey;private IntWritable outValue;@Overrideprotected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {outKey = new Text();outValue = new IntWritable(1);}
Driver類(lèi)的實(shí)現(xiàn)
該類(lèi)的寫(xiě)法基本上是固定的,不同需求只需要在此基礎(chǔ)上修改一下map和reduce業(yè)務(wù)類(lèi)即可。
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.獲取JobConfiguration conf = new Configuration();Job job = Job.getInstance();//2.設(shè)置jar包路徑,綁定driver類(lèi)job.setJarByClass(WordCountDriver.class);//3.關(guān)聯(lián)mapper和reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.設(shè)置map的kv類(lèi)型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5.設(shè)置最終輸出的kv類(lèi)型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//6.設(shè)置輸入路徑和輸出路徑FileInputFormat.setInputPaths(job, new Path("D:\\inputOutput\\input\\wordcount"));FileOutputFormat.setOutputPath(job, new Path("D:\\inputOutput\\output\\output666"));//7.提交jobjob.waitForCompletion(true);}
打包本地程序到集群中運(yùn)行
- 修改本地程序Driver類(lèi)中的輸入輸出路徑
//6.設(shè)置輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
- 使用maven的package命令打包本地程序成.jar文件,生成的jar包在target目錄下
- 使用jar命令解壓運(yùn)行
jar 包名 數(shù)據(jù)輸入路徑 數(shù)據(jù)輸出路徑
可以直接在window中的IDEA中發(fā)送任務(wù)到Linux集群中,但配置方式較為煩瑣,生產(chǎn)環(huán)境中使用該方法的人較少。
序列化
java序列化
序列化目的是將內(nèi)存的對(duì)象放進(jìn)磁盤(pán)中進(jìn)行保存,序列化實(shí)際上是為了傳輸對(duì)象中的屬性值,而不是方法。序列化本質(zhì)上是一種數(shù)據(jù)傳輸技術(shù),IO流。
- ObjectOutputStream 序列化流 writeObject()
- ObjectInputStream 反序列化流 readObject(Object obj)
- 序列化前提條件:
- 實(shí)現(xiàn)Serializable接口
- 空參構(gòu)造器
- 私有的屬性
- 準(zhǔn)備get和set方法
總結(jié):java序列化是一個(gè)比較重的序列化,序列化的內(nèi)容很多,比如屬性+校驗(yàn)+血緣關(guān)系+元數(shù)據(jù)。
hadoop序列化
特點(diǎn): 輕量化,只有屬性值和校驗(yàn)
hadoop中自定義bean對(duì)象步驟:
- 實(shí)現(xiàn)writable接口
- 空參構(gòu)造器
- 私有的屬性
- get和set方法
- 重寫(xiě)
write
方法和readFields
方法(write方法出現(xiàn)的屬性順序必須和readFields的讀取順序一致) - 如果自定義的bean對(duì)象要作為reduce的輸出結(jié)果,需要重寫(xiě)
toString
方法,否則存入磁盤(pán)的是地址值 - 如果自定義的bean對(duì)象要作為map輸出結(jié)果中的key進(jìn)行輸出,并進(jìn)行reduce操作,必須實(shí)現(xiàn)
comparable
接口。
MapReduce框架原理
- 粗略流程:
mapreduce = map + reduce
- 大體流程:
mapreduce = inputFormat --> map --> shuffle(排序) --> reduce -->outputFormat
- 源碼流程:
- inputformat
- map
- map
- sort: 按照字典序進(jìn)行快排
- reduce
- copy:拉取map的處理結(jié)果
- sort:由于結(jié)果是局部有序,不是整體有序,進(jìn)行歸并排序
- reduce:之后再進(jìn)行數(shù)據(jù)合并規(guī)約
InputFormat/OutputFormat基類(lèi)
實(shí)現(xiàn)類(lèi)有TextInputFormat和TextOutputFormat, 其中重點(diǎn)是切片邏輯和讀寫(xiě)邏輯,讀寫(xiě)部分的代碼框架已經(jīng)寫(xiě)死了,主要關(guān)注如何切片即可。
切片與MapTask并行度決定機(jī)制
數(shù)據(jù)塊:Block是物理上真的分開(kāi)存儲(chǔ)了。
數(shù)據(jù)切片:只是邏輯上進(jìn)行分片處理,每個(gè)數(shù)據(jù)切片對(duì)應(yīng)一個(gè)MapTask。
正常來(lái)說(shuō),如果數(shù)據(jù)有300M,我們按照常理來(lái)說(shuō)會(huì)平均劃分成3 x 100 M,但是物理上每個(gè)物理塊是128M,每個(gè)MapTask進(jìn)行計(jì)算時(shí)需要從另外那個(gè)主機(jī)讀取數(shù)據(jù),跨越主機(jī)讀取數(shù)據(jù)需要進(jìn)行網(wǎng)絡(luò)IO,這是很慢的。
所有MR選擇的是按照128M來(lái)進(jìn)行切分,盡管這樣會(huì)導(dǎo)致劃分的數(shù)據(jù)塊并不是十分均勻,但是對(duì)于網(wǎng)絡(luò)IO的延遲來(lái)說(shuō),還是可以接受的。
Hadoop提交流程源碼和切片源碼
提交源碼主要debug節(jié)點(diǎn)
- job.waitForCompletion()
- JobState枚舉類(lèi),DEFINE、RUNING
- submit()
- connect()
- 匿名內(nèi)部類(lèi)、new方法 無(wú)法進(jìn)入,打好斷點(diǎn),點(diǎn)擊快速運(yùn)行進(jìn)入
- ctrl + alt + 左方向鍵,可以返回原先的位置
- initProviderList(): 添加了本地客戶端協(xié)議和Yarn客戶端協(xié)議
- create(conf): 根據(jù)配置文件來(lái)決定代碼的運(yùn)行環(huán)境(Yarn分布式環(huán)境/本地單機(jī)環(huán)境)
- submitter 根據(jù)運(yùn)行環(huán)境獲取相應(yīng)的提交器
- checkSpecs(job):檢查輸出路徑是否正確
- jobStagingArea: job的臨時(shí)運(yùn)行區(qū)域,給定一個(gè)絕對(duì)路徑的目錄D:\tmp\hadoop\mapred\staging,里面存放了:
- local: 切片結(jié)果+8個(gè)配置文件總和
- yarn:切片結(jié)果+8個(gè)配置文件總和+jar包
- copyAndConfigureFiles(): 讀取任務(wù)需要的支持文件讀取到j(luò)ob的臨時(shí)運(yùn)行環(huán)境中,在Yarn環(huán)境中,會(huì)上傳jar包到該路徑中
- writeSplits():給數(shù)據(jù)添加切片標(biāo)記,實(shí)際還未切分,會(huì)生成切片文件到臨時(shí)區(qū)域中
- writeConf(): 寫(xiě)配置文件到目錄中
提交源碼總結(jié):
- mapreduce.framework.name這個(gè)參數(shù)決定了運(yùn)行環(huán)境
- 切片個(gè)數(shù)決定了MapTask個(gè)數(shù)