山東今天新冠疫情最新消息新鄉(xiāng)seo網(wǎng)絡(luò)推廣費(fèi)用
實(shí)驗(yàn)?zāi)康?#xff1a;
1.掌握MapReduce的基本編程流程;
2.掌握MapReduce序列化的使用;
實(shí)驗(yàn)內(nèi)容:
一、在本地創(chuàng)建名為MapReduceTest的Maven工程,在pom.xml中引入相關(guān)依賴包,配置log4j.properties文件,搭建windwos開發(fā)環(huán)境。 編程實(shí)現(xiàn)以下內(nèi)容:
(1)創(chuàng)建com.nefu.(xingming).maxcount包,編寫wordcountMapper、Reducer、Driver三個(gè)類,實(shí)現(xiàn)統(tǒng)計(jì)每個(gè)學(xué)號(hào)的最高消費(fèi)。
輸入數(shù)據(jù)data.txt格式如下:
??????????序號(hào) \t 學(xué)號(hào)?\t ?日期 ?\t ?消費(fèi)總額
輸出數(shù)據(jù)格式要求如下:
??????????學(xué)號(hào) ?\t ?最高消費(fèi)?
ZnMapper.java
package com.nefu.zhangna.maxcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class ZnMapper extends Mapper<LongWritable, Text,Text, IntWritable> {private Text outk=new Text();private IntWritable outv=new IntWritable();@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line=value.toString();String[] content=line.split("\t");String schoolnumber=content[1];String totalFee=content[3];outk.set(schoolnumber);outv.set(Integer.parseInt(totalFee));context.write(outk,outv);}
}
ZnReducer.java
package com.nefu.zhangna.maxcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ZnReducer extends Reducer<Text,IntWritable,Text, IntWritable> {private IntWritable outv=new IntWritable();@Overrideprotected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int total=0;for (IntWritable value:values){if(value.get()>total)total=value.get();}outv.set(total);context.write(key,outv);}
}
ZnDriver.java
package com.nefu.zhangna.maxcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class ZnDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Configuration configuration=new Configuration();Job job=Job.getInstance(configuration);//FileSystem fs=FileSystem.get(new URI("hdfs://hadoop101:8020"),configuration,"hadoop");//fs.copyFromLocalFile(new Path("D://mapreducetest//data.txt"),new Path("/zn/data.txt"));job.setJarByClass(ZnDriver.class);job.setMapperClass(ZnMapper.class);job.setReducerClass(ZnReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//job.setOutputKeyClass(Text.class);//job.setOutputValueClass(StudentBean.class);// job.setInputFormatClass(CombineTextInputFormat.class); //否則默認(rèn)是TextInputFormat.class//CombineTextInputFormat.setMaxInputSplitSize(job,4194304); //設(shè)4MFileInputFormat.setInputPaths(job,new Path("D:\\mapreducetest\\data.txt"));FileOutputFormat.setOutputPath(job,new Path("D:\\cluster\\shiyan3-1"));boolean result=job.waitForCompletion(true);System.exit(result?0:1);}
}
(2)測(cè)試上述程序,查看運(yùn)行結(jié)果
原數(shù)據(jù)
mapreduce之后
(3)查看日志,共有幾個(gè)切片,幾個(gè)MapTask(截圖)
Number of split表示有一個(gè)切片,Starting task:?attempt_local649325949_0001_m_000000_0表示有一個(gè)Map Tast任務(wù)
(4)添加文件data1.txt,重新運(yùn)行程序,共有幾個(gè)切片,幾個(gè)MapTask(截圖)
可見我輸入了兩個(gè)文件,切片的數(shù)目為2,也就有兩個(gè)Map Text任務(wù)
(5)使用CombinTextInputFormat,讓data.txt,data1.txt兩個(gè)文件在一個(gè)切片中
在驅(qū)動(dòng)類中CombinTextInputFormat,可見只有一個(gè)切片
(6)將data.txt上傳至HDFS
(7)使用maven將程序打成jar包并上傳至hadoop集群運(yùn)行,觀察是否能正確運(yùn)行。
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
將程序打成jar包


二、創(chuàng)建com.nefu.(xingming).serialize包,編寫ScoreBean、Mapper、Reducer、Driver三個(gè)類,實(shí)現(xiàn)統(tǒng)計(jì)每個(gè)學(xué)號(hào)的平均成績(jī)。并將結(jié)果按照年級(jí)分別寫到三個(gè)文件中。
輸入數(shù)據(jù)mydata.txt文件格式:
學(xué)號(hào) ?\t ?姓名??\t ??成績(jī)
輸出數(shù)據(jù)格式(共3個(gè)文件):
學(xué)號(hào) ??\t ?姓名??\t ??平均成績(jī)
MyPartition
package com.nefu.zhangna.serialize;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class MyPartition extends Partitioner<Text,ScoreBean > {@Overridepublic int getPartition(Text text,ScoreBean studentBean,int numPartitions) {String snum = text.toString();int partition;if (snum.contains("2021")) {partition = 0;} else if (snum.contains("2022")) {partition = 1;} else{partition=2;}return partition;}
}
Scorebean
package com.nefu.zhangna.serialize;import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class ScoreBean implements Writable{private String name;private Double score;public ScoreBean(){}public String getName() {return name;}public void setName(String name) {this.name = name;}public Double getScore() {return score;}public void setScore(Double score) {this.score = score;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeDouble(score);}@Overridepublic void readFields(DataInput in) throws IOException {this.name=in.readUTF();this.score=in.readDouble();}@Overridepublic String toString(){return this.name+"\t"+this.score;}
}
ZnMapper1
package com.nefu.zhangna.serialize;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class ZnMapper1 extends Mapper<LongWritable, Text, Text,ScoreBean> {private Text outk=new Text();private ScoreBean outv=new ScoreBean();@Overrideprotected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {String line=value.toString();String[] content=line.split("\t");String schoolnumber=content[0];String name=content[1];String score=content[2];outk.set(schoolnumber);outv.setName(name);outv.setScore(Double.parseDouble(score));context.write(outk,outv);}
}
ZnReducer1
package com.nefu.zhangna.serialize;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ZnReducer1 extends Reducer<Text, ScoreBean,Text,ScoreBean> {private ScoreBean outv=new ScoreBean();@Overrideprotected void reduce(Text key,Iterable<ScoreBean> values,Context context) throws IOException, InterruptedException {double score=0;int sum=0;String name = null;for (ScoreBean value:values){sum=sum+1;score=score+value.getScore();name=value.getName();}outv.setName(name);outv.setScore(score/sum);context.write(key,outv);}
}
ZnDriver1
package com.nefu.zhangna.serialize;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ZnDriver1 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration=new Configuration();Job job=Job.getInstance(configuration);job.setJarByClass(ZnDriver1.class);job.setMapperClass(ZnMapper1.class);job.setReducerClass(ZnReducer1.class);job.setMapOutputKeyClass(Text.class);job.setOutputValueClass(ScoreBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(ScoreBean.class);job.setPartitionerClass(MyPartition.class);job.setNumReduceTasks(3);FileInputFormat.setInputPaths(job,new Path("D:\\mapreducetest\\mydata.txt"));FileOutputFormat.setOutputPath(job,new Path("D:\\cluster\\serialize"));boolean result=job.waitForCompletion(true);System.exit(result?0:1);}
}