網(wǎng)站建設(shè)教程實(shí)訓(xùn)心得創(chuàng)建網(wǎng)站免費(fèi)注冊(cè)
文章目錄
- 📚實(shí)驗(yàn)?zāi)康?/li>
- 📚實(shí)驗(yàn)平臺(tái)
- 📚實(shí)驗(yàn)內(nèi)容
- 🐇在本地編寫程序和調(diào)試
- 🥕代碼框架思路
- 🥕代碼實(shí)現(xiàn)
- 🐇在集群上提交作業(yè)并執(zhí)行
- 🥕在集群上提交作業(yè)并執(zhí)行,同本地執(zhí)行相比即需修改路徑。
- 🥕修改后通過expoet,導(dǎo)出jar包,關(guān)注 Main-Class 的設(shè)置!
- 🥕在終端依次輸入以下指令,完成提交
📚實(shí)驗(yàn)?zāi)康?/h1>
倒排索引(Inverted Index)被用來存儲(chǔ)在全文搜索下某個(gè)單詞在一個(gè)文檔或者一組文檔中的存儲(chǔ)位置的映射,是目前幾乎所有支持全文索引的搜索引擎都需要依賴的一個(gè)數(shù)據(jù)結(jié)構(gòu)。通過對(duì)倒排索引的編程實(shí)現(xiàn),熟練掌握 MapReduce 程序在集群上的提交與執(zhí)行過程,加深對(duì) MapReduce 編程框架的理解。
📚實(shí)驗(yàn)平臺(tái)
- 操作系統(tǒng):Linux
- Hadoop 版本:3.2.2
- JDK 版本:1.8
- Java IDE:Eclipse
📚實(shí)驗(yàn)內(nèi)容
關(guān)于倒排索引
🐇在本地編寫程序和調(diào)試
在本地 eclipse 上編寫帶詞頻屬性的對(duì)英文文檔的文檔倒排索引程序,要求程序能夠?qū)崿F(xiàn)對(duì) stop-words(如 a,an,the,in,of 等詞)的去除,能夠統(tǒng)計(jì)單詞在每篇文檔中出現(xiàn)的頻率。文檔數(shù)據(jù)和停詞表可在此鏈接上下載,在偽分布式環(huán)境下完成程序的編寫和調(diào)試。
🥕代碼框架思路
- Map():對(duì)輸入的Text切分為多個(gè)word。這里的
Map()
包含setup()
和map()
。每一次map都伴隨著一次setup,進(jìn)行停詞,篩選那些不需要統(tǒng)計(jì)的。 - Combine():將Map輸出的中間結(jié)果
相同key部分的value累加
,減少向Reduce節(jié)點(diǎn)傳輸?shù)臄?shù)據(jù)量。 - Partition():為了
將同一個(gè)word的鍵值對(duì)發(fā)送到同一個(gè)Reduce節(jié)點(diǎn)
,對(duì)key進(jìn)行臨時(shí)處理。將原key的(word, filename)臨時(shí)拆開,使Partitioner只按照word值進(jìn)行選擇Reduce節(jié)點(diǎn)?;诠V档姆制椒?。 - Reduce():利用每個(gè)Reducer接收到的鍵值對(duì)中,word是排好序的,來進(jìn)行最后的整合。將word#filename拆分開,
將filename與累加和拼到一起
,存在str中。每次比較當(dāng)前的word和上一次的word是否相同
,若相同則將filename和累加和附加到str中,否則輸出:key:word,value:str,并將新的word作為key繼續(xù)。 - 上述reduce()只會(huì)在遇到新word時(shí),處理并輸出前一個(gè)word,故對(duì)于最后一個(gè)word還需要額外的處理。
重載cleanup()
,處理最后一個(gè)word并輸出
倒排索引的Map、Combiner、Partitioner部分就和上圖一樣
- 一個(gè)Map對(duì)應(yīng)一個(gè)Combiner,借助Combiner對(duì)Map輸出進(jìn)行一次初始整合
- 一個(gè)Combiner又對(duì)應(yīng)一個(gè)Partitioner,Partitioner將同一個(gè)word的鍵值對(duì)發(fā)送到同一個(gè)Reduce節(jié)點(diǎn)
🥕代碼實(shí)現(xiàn)
(關(guān)注本地路徑)
package index;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import java.util.Vector;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class index
{public static class Map extends Mapper<Object, Text, Text, IntWritable> {/*** setup():讀取停詞表到vector stop_words中*/Vector<String> stop_words;//停詞表protected void setup(Context context) throws IOException {stop_words = new Vector<String>();//初始化停詞表Configuration conf = context.getConfiguration();//讀取停詞表文件BufferedReader reader = new BufferedReader(new InputStreamReader(FileSystem.get(conf).open(new Path("hdfs://localhost:9000/user/hadoop/input/stop_words_eng.txt"))));String line;while ((line = reader.readLine()) != null) {//按行處理StringTokenizer itr=new StringTokenizer(line);while(itr.hasMoreTokens()){//遍歷詞,存入vectorstop_words.add(itr.nextToken());}}reader.close();}/*** map():對(duì)輸入的Text切分為多個(gè)word* 輸入:key:當(dāng)前行偏移位置 value:當(dāng)前行內(nèi)容* 輸出:key:word#filename value:1*/protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {FileSplit fileSplit = (FileSplit) context.getInputSplit();String fileName = fileSplit.getPath().getName();//獲取文件名,轉(zhuǎn)換為小寫String line = value.toString().toLowerCase();//將行內(nèi)容全部轉(zhuǎn)為小寫字母//只保留數(shù)字和字母String new_line="";for(int i = 0; i < line.length(); i ++) {if((line.charAt(i)>=48 && line.charAt(i)<=57) || (line.charAt(i)>=97 && line.charAt(i)<=122)) {//按行處理new_line += line.charAt(i);} else {//其他字符保存為空格new_line +=" ";}}line = new_line.trim();//去掉開頭和結(jié)尾的空格StringTokenizer strToken=new StringTokenizer(line);//按照空格拆分while(strToken.hasMoreTokens()){String str = strToken.nextToken();if(!stop_words.contains(str)) {//不是停詞則輸出key-value對(duì)context.write(new Text(str+"#"+fileName), new IntWritable(1));}}}}public static class Combine extends Reducer<Text, IntWritable, Text, IntWritable> {/*** 將Map輸出的中間結(jié)果相同key部分的value累加,減少向Reduce節(jié)點(diǎn)傳輸?shù)臄?shù)據(jù)量* 輸入:key:word#filename value:1* 輸出:key:word#filename value:累加和(詞頻)*/protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum ++;}context.write(key, new IntWritable(sum));}}public static class Partition extends HashPartitioner<Text, IntWritable> {/*** 為了將同一個(gè)word的鍵值對(duì)發(fā)送到同一個(gè)Reduce節(jié)點(diǎn),對(duì)key進(jìn)行臨時(shí)處理* 將原key的(word, filename)臨時(shí)拆開,使Partitioner只按照word值進(jìn)行選擇Reduce節(jié)點(diǎn)* 基于哈希值的分片方法*/public int getPartition(Text key, IntWritable value, int numReduceTasks) {//第三個(gè)參數(shù)numPartitions表示每個(gè)Mapper的分片數(shù),也就是Reducer的個(gè)數(shù)String term = key.toString().split("#")[0];//獲取word#filename中的wordreturn super.getPartition(new Text(term), value, numReduceTasks);//按照word分配reduce節(jié)點(diǎn) }}public static class Reduce extends Reducer<Text, IntWritable, Text, Text> {/*** Reduce():利用每個(gè)Reducer接收到的鍵值對(duì)中,word是排好序的,來進(jìn)行最后的整合* 將word#filename拆分開,將filename與累加和拼到一起,存在str中* 每次比較當(dāng)前的word和上一次的word是否相同,若相同則將filename和累加和附加到str中,否則輸出:key:word,value:str,并將新的word作為key繼續(xù)* 輸入:* key value* word1#filename 1 [num1,num2,...]* word1#filename 2 [num1,num2,...]* word2#filename 1 [num1,num2,...]* 輸出:* key:word value:<filename1,詞頻><filename2,詞頻>...<total,總詞頻>*/private String lastfile = null;//存儲(chǔ)上一個(gè)filenameprivate String lastword = null;//存儲(chǔ)上一個(gè)wordprivate String str = "";//存儲(chǔ)要輸出的value內(nèi)容private int count = 0;private int totalcount = 0;protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {String[] tokens = key.toString().split("#");//將word和filename存在tokens數(shù)組中if(lastword == null) {lastword = tokens[0];}if(lastfile == null) {lastfile = tokens[1];}if (!tokens[0].equals(lastword)) {//此次word與上次不一樣,則將上次的word進(jìn)行處理并輸出str += "<"+lastfile+","+count+">;<total,"+totalcount+">.";context.write(new Text(lastword), new Text(str));//value部分拼接后輸出lastword = tokens[0];//更新wordlastfile = tokens[1];//更新filenamecount = 0;str="";for (IntWritable val : values) {//累加相同word和filename中出現(xiàn)次數(shù)count += val.get();//轉(zhuǎn)為int}totalcount = count;return;}if(!tokens[1].equals(lastfile)) {//新的文檔str += "<"+lastfile+","+count+">;";lastfile = tokens[1];//更新文檔名count = 0;//重設(shè)count值for (IntWritable value : values){//計(jì)數(shù)count += value.get();//轉(zhuǎn)為int}totalcount += count;return;}//其他情況,只計(jì)算總數(shù)即可for (IntWritable val : values) {count += val.get();totalcount += val.get();}}/*** 上述reduce()只會(huì)在遇到新word時(shí),處理并輸出前一個(gè)word,故對(duì)于最后一個(gè)word還需要額外的處理* 重載cleanup(),處理最后一個(gè)word并輸出*/public void cleanup(Context context) throws IOException, InterruptedException {str += "<"+lastfile+","+count+">;<total,"+totalcount+">.";context.write(new Text(lastword), new Text(str));super.cleanup(context);}}public static void main(String args[]) throws Exception {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://localhost:9000");if(args.length != 2) {System.err.println("Usage: Relation <in> <out>");System.exit(2);}Job job = Job.getInstance(conf, "InvertedIndex");//設(shè)置環(huán)境參數(shù)job.setJarByClass(index.class);//設(shè)置整個(gè)程序的類名job.setMapperClass(Map.class);//設(shè)置Mapper類job.setCombinerClass(Combine.class);//設(shè)置combiner類job.setPartitionerClass(Partition.class);//設(shè)置Partitioner類job.setReducerClass(Reduce.class);//設(shè)置reducer類job.setOutputKeyClass(Text.class);//設(shè)置Mapper輸出key類型job.setOutputValueClass(IntWritable.class);//設(shè)置Mapper輸出value類型FileInputFormat.addInputPath(job, new Path(args[0]));//輸入文件目錄FileOutputFormat.setOutputPath(job, new Path(args[1]));//輸出文件目錄System.exit(job.waitForCompletion(true) ? 0 : 1);//參數(shù)true表示檢查并打印 Job 和 Task 的運(yùn)行狀況}}
?補(bǔ)充:當(dāng)我們新建一個(gè)Package和Class后運(yùn)行時(shí),可能會(huì)出現(xiàn)如下報(bào)錯(cuò)(主要是在MapReduce編程輸入輸出里會(huì)遇到)
?解決辦法:
- “Run As”選中“Run Configurations…”
- 然后在“Arguments”里輸入
input output
,然后再run就行了。
🐇在集群上提交作業(yè)并執(zhí)行
集群的服務(wù)器地址為 10.102.0.198,用戶主目錄為/home/用戶名,hdfs 目錄為/user/用戶名。集群上的實(shí)驗(yàn)文檔存放目錄為 hdfs://10.102.0.198:9000/input/. 英文停詞表文件存放位置為hdfs://10.102.0.198:9000/stop_words/stop_words_eng.txt。
🥕在集群上提交作業(yè)并執(zhí)行,同本地執(zhí)行相比即需修改路徑。
🥕修改后通過expoet,導(dǎo)出jar包,關(guān)注 Main-Class 的設(shè)置!
- 選中index.java右鍵Export。
- 如下圖選中
JAR file
后點(diǎn)Next。
- 確認(rèn)選中index及其src,
JAR的命名要和class名一樣
,比如這里是index.java,就是class index,也就是index.jar
。然后點(diǎn)Next。
- 到如下頁面,再點(diǎn)Next。
- 在
Main class
那點(diǎn)Browse,選中index。
- 如下圖。
- 最后點(diǎn)finish完成導(dǎo)出,可在文件夾里找到index.jar。雙擊index.jar,在它的
METS-INT
里頭查看Main-Class是否設(shè)置成功。
🥕在終端依次輸入以下指令,完成提交
- 使用
scp InvertedIndex.jar 用戶名@10.102.0.198:/home/用戶名
命令將本地程序提交到 Hadoop 集群 - 通過
ssh 用戶名@10.102.0.198
命令遠(yuǎn)程登錄到 Hadoop 集群進(jìn)行操作; - 使用
hadoop jar InvertedIndex.jar /input /user/用戶名/output
命令在集群上運(yùn)行 Hadoop 作業(yè),指定輸出目錄為自己 hdfs 目錄下的 output。 - 使用
diff 命令
判斷自己的輸出結(jié)果與標(biāo)準(zhǔn)輸出的差異
scp index.jar bigdata_學(xué)號(hào)@10.102.0.198:/home/bigdata_學(xué)號(hào)
ssh bigdata_學(xué)號(hào)@10.102.0.198
hadoop jar index.jar /input /user/bigdata_學(xué)號(hào)/output
diff <(hdfs dfs -cat /output/part-r-00000) <(hdfs dfs -cat /user/bigdata_學(xué)號(hào)/output/part-r-00000)
在瀏覽器中打開 http://10.102.0.198:8088,可以查看集群上作業(yè)的基本執(zhí)行情況。