百度蜘蛛抓取新網(wǎng)站亞馬遜關(guān)鍵詞快速優(yōu)化
說明
以下內(nèi)容僅供參考,提到不代表考到,請結(jié)合實(shí)際情況自己復(fù)習(xí)
目錄
說明
一、題型及分值
二、綜合案例題-部署Hadoop集群 或 部署Hadoop HA集群
案例 1:Hadoop 基礎(chǔ)集群部署
案例 2:Hadoop HA 集群部署
案例 3:集群性能優(yōu)化
案例 4:故障排查與恢復(fù)
案例 5:多集群協(xié)同???????
三、名稱解釋(8選5)
1.什么是大數(shù)據(jù)
2.大數(shù)據(jù)的5V特征
3.什么是SSH
4.HDFS(p32)
5.名稱節(jié)點(diǎn)
6.數(shù)據(jù)節(jié)點(diǎn)
7.元數(shù)據(jù)
8.倒排索引
9.單點(diǎn)故障
10.高可用
11.數(shù)據(jù)倉庫
四、簡答題
1、簡述Hadoop的優(yōu)點(diǎn)及其含義
2.簡述獨(dú)立模式、偽分布式模式和完全分布式模式部署Hadoop的區(qū)別
3.簡述HDFS的健壯性
4.簡述YARN基本架構(gòu)的組成部分及其作用
5.簡述不同類型ZNode的區(qū)別
6.簡述Hadoop高可用集群初次啟動時的步驟
7.簡述Hive中分區(qū)和桶的作用
五、Hive代碼題
題目: 電商訂單分析
答案
六、HDFS代碼題
課本資料
相關(guān)題目
題目 1: 創(chuàng)建目錄并上傳文件
題目 2: 查看文件信息
題目 3: 數(shù)據(jù)移動與復(fù)制
題目 4: 刪除操作
題目 5: 文件權(quán)限設(shè)置
題目 6: 文件備份與驗(yàn)證
七、MapReduce編程
題目 1: 單詞計數(shù) (Word Count)
題目 2: 最大值求解 (Max Value Finder)
?題目 3: 平均值計算 (Average Calculation)
題目 4: Top K 單詞統(tǒng)計
思路
題目 5: 日志分析
?題目 6: 用戶購買分析
題目 7: 倒排索引 (Inverted Index)
?題目 8: 用戶商品共現(xiàn)分析
?題目 9: 數(shù)據(jù)去重
題目 10: 分組統(tǒng)計 (Group By)
一、題型及分值
1.綜合案例題(35分)
2.名詞解釋(每個3分,共15分)
3.簡答題(每題6分,共30分)
4.編程題(共3題,共20分)
二、綜合案例題-部署Hadoop集群 或 部署Hadoop HA集群
這部分內(nèi)容建議觀看課本
tar -zxvf jdk-8u... -C /export/...
在Linux系統(tǒng)中,tar命令用于打包和解包文件。以下是tar命令中 -zxvf 和 -C 選項(xiàng)的含義:-z:這個選項(xiàng)表示同時通過gzip進(jìn)行壓縮或解壓縮。如果.tar文件實(shí)際上是一個.tar.gz或.tgz文件,這個選項(xiàng)是必要的。
-x:這個選項(xiàng)代表解包(extract)一個.tar文件。
-v:這個選項(xiàng)用于在處理文件時顯示詳細(xì)信息(verbose),它會列出正在處理的文件,這樣用戶可以看到解壓的進(jìn)度和具體內(nèi)容。
-f:這個選項(xiàng)用于指定要處理的文件名。在tar命令中,-f通常是最后一個選項(xiàng),并且后面直接跟著要操作的文件名。
所以,-zxvf組合起來就是告訴tar命令:解壓縮一個用gzip壓縮的.tar.gz文件,并在處理過程中顯示詳細(xì)信息。-C:這個選項(xiàng)告訴tar命令在指定的目錄中解包文件。在上面的命令中,-C /export/…表示將文件解壓到/export/…這個目錄下。
綜上所述,tar -zxvf jdk-8u... -C /export/…命令的作用是:以詳細(xì)方式解壓名為jdk-8u...的gzip壓縮的tar文件,并將其內(nèi)容解壓到/export/…目錄中。注意,jdk-8u...是文件名的占位符,你需要替換為實(shí)際的文件名。
# 驗(yàn)證Hadoop是否安裝成功
bin/hadoop version
# 啟動Hadoop
start-dfs.sh
# 啟動yarn
start-yarn.sh
# 查看hadoop運(yùn)行狀態(tài)
jps# 查看jdk是否安裝成功
java -version
案例 1:Hadoop 基礎(chǔ)集群部署
背景: 某公司計劃部署一個基本的 Hadoop 集群,要求包含一個 NameNode 和多個 DataNode。公司要求能夠順利存儲和處理 10 TB 的數(shù)據(jù)。
問題:
-
設(shè)計集群架構(gòu),明確節(jié)點(diǎn)的數(shù)量和角色分配。
-
描述如何配置
core-site.xml
和hdfs-site.xml
,確保集群可以正常啟動并運(yùn)行。 -
為了實(shí)現(xiàn) MapReduce 作業(yè),如何配置
mapred-site.xml
和yarn-site.xml
? -
如果添加一個新的 DataNode,該如何操作?
答案:
-
設(shè)計集群架構(gòu):
-
1 個 NameNode,2-3 個 DataNode(視服務(wù)器性能與數(shù)據(jù)量調(diào)整)。
-
每個節(jié)點(diǎn)運(yùn)行操作系統(tǒng)(推薦 Linux)。
-
Master 節(jié)點(diǎn)運(yùn)行 NameNode 和 ResourceManager,Slave 節(jié)點(diǎn)運(yùn)行 DataNode 和 NodeManager。
-
-
配置
core-site.xml
和hdfs-site.xml
: -
????????配置 mapred-site.xml
和 yarn-site.xml
:
-
添加新的 DataNode 操作:
-
安裝 Hadoop,配置
core-site.xml
和hdfs-site.xml
指向 NameNode 的地址。 -
格式化 DataNode 數(shù)據(jù)目錄:
hdfs datanode -format
。 -
啟動 DataNode 服務(wù):
hadoop-daemon.sh start datanode
。
-
案例 2:Hadoop HA 集群部署
背景: 為了提高可靠性,公司計劃將現(xiàn)有的單點(diǎn) NameNode 改為高可用(HA)模式。集群中有兩臺主機(jī)分別作為 Active NameNode 和 Standby NameNode,并通過 ZooKeeper 進(jìn)行管理。
問題:
-
描述如何配置 HDFS 的 HA 功能,包括必要的配置文件和關(guān)鍵參數(shù)。
-
配置 JournalNode,解釋其作用及最低運(yùn)行數(shù)量要求。
-
如果 Active NameNode 宕機(jī),系統(tǒng)如何實(shí)現(xiàn)自動切換到 Standby NameNode?需要哪些關(guān)鍵組件?
-
模擬 NameNode 切換的過程,驗(yàn)證 HA 功能是否正常運(yùn)行
-
案例 3:集群性能優(yōu)化
背景: 集群在運(yùn)行大規(guī)模作業(yè)時,經(jīng)常出現(xiàn)作業(yè)延遲或失敗。系統(tǒng)管理員需要優(yōu)化集群性能。
問題:
-
分析可能導(dǎo)致作業(yè)延遲的原因(從硬件、網(wǎng)絡(luò)、配置等角度)。
-
提出三種優(yōu)化 HDFS 性能的策略(例如 Block 大小調(diào)整)。
-
YARN 中如何配置以確保資源分配更加均衡?
-
描述如何使用 Ganglia 或 Prometheus 監(jiān)控 Hadoop 集群性能。
案例 4:故障排查與恢復(fù)
背景: 運(yùn)行中的 Hadoop 集群出現(xiàn)以下問題:
-
某些 DataNode 狀態(tài)為
Dead
。 -
Active NameNode 停止響應(yīng)。
-
某些作業(yè)無法正常調(diào)度。
問題:
-
針對 DataNode 狀態(tài)為
Dead
的問題,描述可能的原因和修復(fù)方法。 -
Active NameNode 停止響應(yīng)時,如何手動切換到 Standby NameNode?
-
某些作業(yè)無法調(diào)度,如何排查和解決 YARN 資源不足的問題?
-
描述如何通過配置快照和備份機(jī)制實(shí)現(xiàn) HDFS 數(shù)據(jù)恢復(fù)。
案例 5:多集群協(xié)同
背景: 公司計劃在兩個地理位置上分別部署 Hadoop 集群 A 和 B,并希望能夠?qū)崿F(xiàn)跨集群的數(shù)據(jù)同步。
問題:
-
描述如何配置 HDFS Federation 實(shí)現(xiàn)多集群協(xié)同。
-
如果需要在集群 A 和集群 B 之間同步數(shù)據(jù),如何利用
DistCp
工具完成? -
為了提高同步效率,可以采取哪些優(yōu)化措施?
-
討論跨集群數(shù)據(jù)傳輸時的安全性考慮及配置(例如數(shù)據(jù)加密)。
三、名稱解釋(8選5)
1.什么是大數(shù)據(jù)
大數(shù)據(jù)是指海量、多樣、快速流轉(zhuǎn)且價值密度低的數(shù)據(jù)集合,其核心價值在于通過先進(jìn)技術(shù)加工處理,實(shí)現(xiàn)數(shù)據(jù)增值。
2.大數(shù)據(jù)的5V特征
大數(shù)據(jù)的特征包括大量(Volume)、真實(shí)(Veracity)、多樣(Variety)、低價值密度(Value)和高速(Velocity)
3.什么是SSH
SSH是一種網(wǎng)絡(luò)協(xié)議,主要用于在不安全網(wǎng)絡(luò)上提供安全的遠(yuǎn)程登錄和其他安全網(wǎng)絡(luò)服務(wù)。它能夠加密網(wǎng)絡(luò)連接,確保在客戶端和服務(wù)器之間傳輸?shù)臄?shù)據(jù)不會輕易被竊取或篡改。
4.HDFS(p32)
HDFS是Hadoop Distributed File System的縮寫,中文稱為Hadoop分布式文件系統(tǒng),專為大規(guī)模數(shù)據(jù)集的處理而設(shè)計,主要用于存儲和管理海量數(shù)據(jù)文件。
5.名稱節(jié)點(diǎn)
課本解釋:NameNode是HDFS集群的名稱節(jié)點(diǎn),通常稱為主節(jié)點(diǎn)。如果NameNode由于故障原因而無法使用,那么用戶就無法訪問HDFS。也就是說,NameNode作為HDFS的主節(jié)點(diǎn),起著至關(guān)重要的部分
gpt解釋:名稱節(jié)點(diǎn)(NameNode)是Hadoop分布式文件系統(tǒng)(HDFS)中的核心組件之一,主要負(fù)責(zé)存儲文件的元數(shù)據(jù)信息和處理客戶端對文件的訪問請求
6.數(shù)據(jù)節(jié)點(diǎn)
DataNode是HDFS集群中的數(shù)據(jù)節(jié)點(diǎn),通常稱為從節(jié)點(diǎn),其主要功能如下:
-
存儲Block
-
根據(jù)NameNode的指令對Block進(jìn)行創(chuàng)建、復(fù)制、刪除等操作
-
定期向NameNode匯報自身存儲的Block列表及健康狀態(tài)
-
負(fù)責(zé)為客戶端發(fā)起的讀寫請求提供服務(wù)
7.元數(shù)據(jù)
MetaData用于記錄HDFS文件系統(tǒng)的相關(guān)信息,這些信息稱為元數(shù)據(jù),元數(shù)據(jù)的內(nèi)容包括文件系統(tǒng)的目錄結(jié)構(gòu)、文件名、文件路徑、文件大小、文件副本數(shù)、文件與Block的映射關(guān)系,以及Block與DataNode的映射關(guān)系等信息
8.倒排索引
倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛應(yīng)用于全文搜索引擎。倒排索引主要用來存儲某個單詞或詞組在一組文檔中的存儲位置的映射,提供了可以根據(jù)內(nèi)容查找文檔的方式,而不是根據(jù)文檔確定內(nèi)容,因此稱為倒排索引。
9.單點(diǎn)故障
在HDFS集群中,NameNode是主節(jié)點(diǎn),它的運(yùn)行狀態(tài)決定著HDFS集群是否可用。然而在Hadoop設(shè)計之初,HDFS集群只能存在一個NameNode節(jié)點(diǎn),這種設(shè)計的缺點(diǎn)是NameNode節(jié)點(diǎn)一旦發(fā)生故障,就會導(dǎo)致HDFS集群不可用,這就是所謂的單點(diǎn)故障問題
10.高可用
Hadoop通過在HDFS集群中配置多個NameNode(一個Active,多個Standby)來確保系統(tǒng)連續(xù)運(yùn)行,當(dāng)Active NameNode故障時,自動選舉新的Active NameNode,防止單點(diǎn)故障。
11.數(shù)據(jù)倉庫
數(shù)據(jù)倉庫是一個面向主題、集成的、相對穩(wěn)定和反映歷史變化的數(shù)據(jù)集合,用于企業(yè)或組織的決策分析。
四、簡答題
有的來自于書本,有的來自于AI(因?yàn)闀緝?nèi)容過多)
1、簡述Hadoop的優(yōu)點(diǎn)及其含義
(1)低成本,可用多臺廉價機(jī)組建集群,分布式處理大數(shù)據(jù),降低成本。(2)高可靠性,自動保存數(shù)據(jù)副本,避免數(shù)據(jù)丟失。(3)高容錯性,自動檢測并應(yīng)對故障,通過任務(wù)轉(zhuǎn)移,防止任務(wù)失敗。(4)高效率,Hadoop可高效的執(zhí)行并行計算,且在各個計算機(jī)中動態(tài)地移動計算。(5)高擴(kuò)展性,可隨時添加更多的計算機(jī),增加集群存儲,計算能力。
2.簡述獨(dú)立模式、偽分布式模式和完全分布式模式部署Hadoop的區(qū)別
? (1)獨(dú)立模式:本地獨(dú)立模式不進(jìn)行任何配置,是Hadoop的默認(rèn)工作模式,所有組件都在同一臺機(jī)器運(yùn)行,適用于學(xué)習(xí)和體驗(yàn)。
? (2)偽分布模式:也是在一臺單機(jī)上運(yùn)行,通過單節(jié)點(diǎn)模擬分布式,但部署的Hadoop集群是一個偽分布式系統(tǒng),適合本地開發(fā)和驗(yàn)證。
? (3)完全分布模式:是一種在多臺計算機(jī)JVM進(jìn)程中運(yùn)行Hadoop集群的工作模式,所有組件分布在多臺機(jī)器上,部署的集群是完全分布式系統(tǒng),適用于生產(chǎn)環(huán)境。
3.簡述HDFS的健壯性
其健壯性可表現(xiàn)為:在HDFS出現(xiàn)故障的情況下可靠的存儲數(shù)據(jù),其運(yùn)用了心跳機(jī)制、副本機(jī)制、數(shù)據(jù)完整性校驗(yàn)、安全模式和快照 5 種策略保證了數(shù)據(jù)存儲的可靠性
4.簡述YARN基本架構(gòu)的組成部分及其作用
YARN 基本架構(gòu)由 ResourceManager、ApplicationMaster、NodeManager 和 Container 組成,其中,ResourceManager 為全局資源管理器,負(fù)責(zé)整個系統(tǒng)的資源管理和分配;ApplicationMaster每個應(yīng)用程序特有的,負(fù)責(zé)單個應(yīng)用程序的管理;NodeManager 負(fù)責(zé)在節(jié)點(diǎn)上啟動和管理;Container(容器);Container 封裝了每個應(yīng)用程序使用的資源。
5.簡述不同類型ZNode的區(qū)別
ZooKeeper中的ZNode類型主要有以下區(qū)別:
-
持久節(jié)點(diǎn):除非手動刪除,否則一直存在。
-
臨時節(jié)點(diǎn):隨客戶端會話結(jié)束而自動刪除,不能有子節(jié)點(diǎn)。
-
順序節(jié)點(diǎn):在持久或臨時節(jié)點(diǎn)基礎(chǔ)上,創(chuàng)建時帶唯一遞增序號,用于記錄創(chuàng)建順序。
6.簡述Hadoop高可用集群初次啟動時的步驟
1.啟動JournalNode
hdfs -- daemon start journalnode
2.格式化HDFS文件系統(tǒng)
hdfs namenode -format
3.同步NameNode
scp -r /export/data/hadoop/namenode/ hadoop2:/export/data/hadoop/
4.格式化ZKFC
hdfs zkfc -formatZK
5.啟動HDFS
start-dfs.sh
6.啟動YARN
start-yarn.sh
7.簡述Hive中分區(qū)和桶的作用
分區(qū):將表數(shù)據(jù)按規(guī)則劃分存儲在不同目錄,減少全表掃描,提高查詢效率。
桶:按規(guī)則將數(shù)據(jù)均勻分布在不同文件中,避免數(shù)據(jù)傾斜,優(yōu)化查詢性能。
五、Hive代碼題
Hive實(shí)踐作業(yè)三
7.5 數(shù)據(jù)庫操作
7.6 表操作1.在hadoop1中執(zhí)行start-dfs.sh和start-yarn.sh分別啟動hdfs和yarn,
保證hadoop完全分布式集群正常啟動,
hadoop1 jps NameNode ResourceManager
hadoop2 jps NodeManager DataNode SecondaryNameNode
hadoop3 jps NodeManager DataNode
2.在hadoop3中執(zhí)行systemctl status mysqld驗(yàn)證mysql80正常啟動
3.在hadoop3中啟動MetaStore服務(wù),hive --service metastore
4.hadoop3復(fù)制會話,啟動HiveServer2服務(wù),hive --service hiveserver2
5.hadoop3再次復(fù)制會話,jps 多了兩個RunJar進(jìn)程,元數(shù)據(jù)存儲系統(tǒng)和HiveServer2正常啟動
6.在hadoop2中,執(zhí)行 hive 登錄
7.hadoop2復(fù)制會話,執(zhí)行 beeline -u jdbc:hive2://hadoop3:10000 -n root 登錄8.在登錄的hive中,輸入:show databases;9.數(shù)據(jù)庫操作
創(chuàng)建數(shù)據(jù)庫 create database homework;
查看數(shù)據(jù)庫 describe homework;
切換數(shù)據(jù)庫 use homework;
10.表操作
complex表的創(chuàng)建
create table complex(
col1 array<int>,
col2 map<int,string>,
col3 struct<a:string,b:int,c:double>,
col4 string,
col5 int
);complex表的查看
desc complex;user_p分區(qū)表的創(chuàng)建
create table user_p (id int, name string)
partitioned by (gender string)
row format delimited fields terminated by ',';user_p分區(qū)表的查看
desc user_p;array_test內(nèi)部表的創(chuàng)建
create table array_test(
name string,
score array<int>
)
row format delimited fields terminated by '@'
collection items terminated by ',';array_test內(nèi)部表導(dǎo)入數(shù)據(jù)zhangshan@89,88,97,80
lisi@90,95,99,97
wangwu@90,77,88,79
zhaoliu@91,79,98,89文本文件array_test.txt需要先創(chuàng)建,在hadoop3上哦load data local inpath '/root/array_test.txt' into table homework.array_test;array_test內(nèi)部表查詢數(shù)據(jù)
select * from homework.array_test;map_test內(nèi)部表的創(chuàng)建
create table map_test(
name string,
score map<string,int>
)
row format delimited fields terminated by '@'
collection items terminated by ','
map keys terminated by ':';map_test內(nèi)部表導(dǎo)入數(shù)據(jù)zhangshan@math:90,english:89,java:88,hive:80
lisi@math:98,english:79,java:96,hive:92
wangwu@math:88,english:86,java:89,hive:88
zhaoliu@math:89,english:78,java:79,hive:77文本文件map_test.txt需要先創(chuàng)建,在hadoop3上哦load data local inpath '/root/map_test.txt' into table homework.map_test;map_test內(nèi)部表查詢數(shù)據(jù)
select * from homework.map_test;select name from homework.map_test;select score from homework.map_test;
題目: 電商訂單分析
某電商公司需要構(gòu)建訂單管理系統(tǒng),并在 Hive 中完成以下任務(wù):
數(shù)據(jù)描述
訂單數(shù)據(jù)由以下信息組成:
-
order_id
(訂單ID, INT) -
customer_id
(客戶ID, INT) -
order_date
(訂單日期, STRING,格式:yyyy-MM-dd) -
order_items
(訂單商品,ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>) -
order_details
(訂單詳情,MAP<STRING, STRING>,包含鍵值對如payment_method -> credit_card
,delivery_status -> delivered
)
任務(wù)要求:
任務(wù) 1: 內(nèi)部表創(chuàng)建
創(chuàng)建一個內(nèi)部表 orders_internal
,用于存儲上述訂單數(shù)據(jù)。
任務(wù) 2: 外部表創(chuàng)建
創(chuàng)建一個外部表 orders_external
,數(shù)據(jù)存儲在 HDFS 的 /data/orders_external/
目錄下。
任務(wù) 3: 分區(qū)表創(chuàng)建
創(chuàng)建一個按 order_date
分區(qū)的表 orders_partitioned
,優(yōu)化按日期范圍查詢的性能。
任務(wù) 4: 數(shù)據(jù)插入
為三個表分別插入以下樣例數(shù)據(jù):
訂單1:order_id: 101, customer_id: 1, order_date: '2024-12-01'order_items: [{item_id: 201, item_name: 'Laptop', quantity: 1, price: 1000.0},{item_id: 202, item_name: 'Mouse', quantity: 2, price: 25.0}]order_details: {'payment_method': 'credit_card', 'delivery_status': 'delivered'}訂單2:order_id: 102, customer_id: 2, order_date: '2024-12-02'order_items: [{item_id: 203, item_name: 'Keyboard', quantity: 1, price: 50.0}]order_details: {'payment_method': 'paypal', 'delivery_status': 'shipped'}
任務(wù) 5: 查詢操作
-
查詢所有已完成(
delivery_status = delivered
)訂單的客戶ID及商品明細(xì)。 -
查詢每個客戶的總消費(fèi)金額。
答案
任務(wù) 1: 內(nèi)部表創(chuàng)建
CREATE TABLE orders_internal (order_id INT,customer_id INT,order_date STRING,order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,order_details MAP<STRING, STRING>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':';
任務(wù) 2: 外部表創(chuàng)建
CREATE EXTERNAL TABLE orders_external (order_id INT,customer_id INT,order_date STRING,order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,order_details MAP<STRING, STRING>
)
STORED AS TEXTFILE
LOCATION '/data/orders_external/';
任務(wù) 3: 分區(qū)表創(chuàng)建
CREATE TABLE orders_partitioned (order_id INT,customer_id INT,order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,order_details MAP<STRING, STRING>
)
PARTITIONED BY (order_date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':';
任務(wù) 4: 數(shù)據(jù)插入
插入內(nèi)部表:
INSERT INTO TABLE orders_internal VALUES
(101, 1, '2024-12-01', ARRAY(NAMED_STRUCT('item_id', 201, 'item_name', 'Laptop', 'quantity', 1, 'price', 1000.0),NAMED_STRUCT('item_id', 202, 'item_name', 'Mouse', 'quantity', 2, 'price', 25.0)),MAP('payment_method', 'credit_card', 'delivery_status', 'delivered')
),
(102, 2, '2024-12-02',ARRAY(NAMED_STRUCT('item_id', 203, 'item_name', 'Keyboard', 'quantity', 1, 'price', 50.0)),MAP('payment_method', 'paypal', 'delivery_status', 'shipped')
);
插入外部表: 將數(shù)據(jù)手動存儲在 /data/orders_external/
,格式如下:
101 1 2024-12-01 [{"item_id":201,"item_name":"Laptop","quantity":1,"price":1000.0},{"item_id":202,"item_name":"Mouse","quantity":2,"price":25.0}] {"payment_method":"credit_card","delivery_status":"delivered"}
102 2 2024-12-02 [{"item_id":203,"item_name":"Keyboard","quantity":1,"price":50.0}] {"payment_method":"paypal","delivery_status":"shipped"}
加載數(shù)據(jù)后,直接查詢外部表。
插入分區(qū)表:
INSERT INTO TABLE orders_partitioned PARTITION(order_date='2024-12-01') VALUES
(101, 1, ARRAY(NAMED_STRUCT('item_id', 201, 'item_name', 'Laptop', 'quantity', 1, 'price', 1000.0),NAMED_STRUCT('item_id', 202, 'item_name', 'Mouse', 'quantity', 2, 'price', 25.0)),MAP('payment_method', 'credit_card', 'delivery_status', 'delivered')
);INSERT INTO TABLE orders_partitioned PARTITION(order_date='2024-12-02') VALUES
(102, 2, ARRAY(NAMED_STRUCT('item_id', 203, 'item_name', 'Keyboard', 'quantity', 1, 'price', 50.0)),MAP('payment_method', 'paypal', 'delivery_status', 'shipped')
);
任務(wù) 5: 查詢操作
1. 查詢所有已完成(delivery_status = delivered
)訂單的客戶ID及商品明細(xì):
SELECT customer_id, order_items
FROM orders_internal
WHERE order_details['delivery_status'] = 'delivered';
2. 查詢每個客戶的總消費(fèi)金額:
SELECT customer_id, SUM(total_price) AS total_spent
FROM (SELECT customer_id, item.quantity * item.price AS total_priceFROM orders_internal LATERAL VIEW EXPLODE(order_items) exploded_items AS item
) t
GROUP BY customer_id;
六、HDFS代碼題
課本資料
dfs常用的子命令選項(xiàng)
子命令選項(xiàng) | 功能描述 |
---|---|
-ls | 查看指定目錄信息 |
-du | 查看指定目錄下每個文件和子目錄的大小,子目錄也可以看作單獨(dú)的目錄,因?yàn)樗部?以存在于目錄 |
-mv | 移動到指定文件或目錄 |
-cp | 復(fù)制指定文件或目錄 |
-rm | 刪除指定文件或目錄 |
-put | 將本地文件系統(tǒng)中的指定文件傳到 HDFS 指定目錄 |
-cat | 查看指定文件的內(nèi)容 |
-help | 查看幫助文檔 |
-mkdir | 創(chuàng)建目錄 |
-get | 將 HDFS 的指定文件下載到本地文件系統(tǒng) |
1.查看目錄 /data的信息
hdfs dfs -ls -S /data
-S 按照由大到小的順序顯示指定目錄的內(nèi)容根據(jù)文件內(nèi)容大小,按照由小到大的順序顯示目錄 /data的內(nèi)容,并將默認(rèn)的文件大小格式化為便于查看的格式進(jìn)行顯示
hdfs dfs -ls -r -h /data
-r 根據(jù)文件大小按照由小到大的順序顯示目錄
-h 將默認(rèn)文件大小(字節(jié)數(shù))格式化為便于查看的格式進(jìn)行顯示遞歸顯示目錄/data及其子目錄的信息,信息中僅顯示文件和子目錄的路徑
hdfs dfs -ls -R -C /data
-R 遞歸顯示目錄/data及其子目錄的信息
-C 信息中僅顯示文件和子目錄的路徑2.在HDFS的目錄/data中創(chuàng)建子目錄/dataChild1。并在子目錄/dataChild1中創(chuàng)建子目錄/dataChild2
hdfs dfs -mkdir -p /data/dataChild1/dataChild23.查看/data中每個文件和子目錄的大小,并將默認(rèn)的文件和子目錄大小格式化為便于查看的格式進(jìn)行顯示
hdfs dfs -du -h /data4.將目錄/data中的子目錄/dataChild1 移動到目錄/data/dataChild中
hdfs dfs -mv /dataChild1 /data/dataChild將目錄/data中的文件dataA 重命名為dataA_New
hdfs dfs -mv /data/dataA /data/dataA_New5.將目錄/data下的文件dataA_New 和 dataB復(fù)制到目錄/data/dataChild
hdfs dfs -cp /data/dataA_New /data/dataB_New /data/dataChild將目錄/data下的文件 dataA_New復(fù)制到子目錄/dataChild,并將其重命名為dataA
hdfs dfs -cp /data/dataA_New /data/dataChild/dataA6.刪除目錄/data的子目錄/dataChild
hdfs dfs -rm -r /data/dataChild7.將本地文件系統(tǒng)中/export/data目錄下文件a.txt 和 b.txt上傳到HDFS的目錄/data
hdfs dfs -put /export/data/a.txt /export/data/b.txt /data8.查看目錄/data中的文件a.txt的內(nèi)容
hdfs dfs -cat /data/a.txt9.將HDFS中目錄/data中的文件a.txt和b.txt 下載到本地文件系統(tǒng)/opt目錄下
hdfs dfs -get /data/a.txt /data/b.txt /opt
相關(guān)題目
題目 1: 創(chuàng)建目錄并上傳文件
描述: 假設(shè)你在 HDFS 的根目錄下,需要完成以下操作:
-
在 HDFS 中創(chuàng)建一個名為
/user/yourname/data
的目錄。 -
將本地目錄
/local/data/input.txt
中的文件上傳到剛創(chuàng)建的 HDFS 目錄中。
要求: 編寫 Shell 命令完成以上任務(wù)。
題目 2: 查看文件信息
描述: 假設(shè) HDFS 中已經(jīng)存在目錄 /user/yourname/data/input.txt
,需要完成以下操作:
-
查看該文件的詳細(xì)信息(包括文件權(quán)限、大小等)。
-
顯示該文件的內(nèi)容。
要求: 寫出相應(yīng)的 HDFS Shell 命令。
題目 3: 數(shù)據(jù)移動與復(fù)制
描述:
-
將文件
/user/yourname/data/input.txt
移動到 HDFS 中的/user/yourname/archive/
目錄下。 -
將文件
/user/yourname/archive/input.txt
復(fù)制回/user/yourname/data/
目錄。
要求: 提供具體的 HDFS Shell 命令。
題目 4: 刪除操作
描述: 刪除 HDFS 中的 /user/yourname/data
目錄及其內(nèi)容,并驗(yàn)證該目錄是否被成功刪除。
要求: 寫出執(zhí)行以上操作的 Shell 命令。
題目 5: 文件權(quán)限設(shè)置
描述: 假設(shè) /user/yourname/data/input.txt
文件需要滿足以下權(quán)限要求:
-
文件所有者可以讀寫;
-
文件所在組成員只能讀取;
-
其他用戶無權(quán)限。
要求: 提供修改文件權(quán)限的 HDFS Shell 命令。
題目 6: 文件備份與驗(yàn)證
描述:
-
將 HDFS 中的
/user/yourname/data/input.txt
備份到/user/yourname/backup/input.txt
。 -
驗(yàn)證備份文件與原文件的內(nèi)容是否一致。
要求: 寫出完整的 Shell 命令。
七、MapReduce編程
題目 1: 單詞計數(shù) (Word Count)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = value.toString().split("\\s+");for (String token : tokens) {word.set(token);context.write(word, one);}}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
題目 2: 最大值求解 (Max Value Finder)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MaxValue {public static class MaxMapper extends Mapper<Object, Text, Text, IntWritable> {private final static Text keyOut = new Text("Max");public void map(Object key, Text value, Context context) throws IOException, InterruptedException {int num = Integer.parseInt(value.toString());context.write(keyOut, new IntWritable(num));}}public static class MaxReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int max = Integer.MIN_VALUE;for (IntWritable val : values) {max = Math.max(max, val.get());}context.write(key, new IntWritable(max));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "max value");job.setJarByClass(MaxValue.class);job.setMapperClass(MaxMapper.class);job.setReducerClass(MaxReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
?題目 3: 平均值計算 (Average Calculation)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class AverageCalculation {public static class AvgMapper extends Mapper<Object, Text, Text, IntWritable> {private final static Text keyOut = new Text("Average");public void map(Object key, Text value, Context context) throws IOException, InterruptedException {int num = Integer.parseInt(value.toString());context.write(keyOut, new IntWritable(num));}}public static class AvgReducer extends Reducer<Text, IntWritable, Text, Text> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0, count = 0;for (IntWritable val : values) {sum += val.get();count++;}double average = (double) sum / count;context.write(key, new Text(String.format("%.2f", average)));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "average calculation");job.setJarByClass(AverageCalculation.class);job.setMapperClass(AvgMapper.class);job.setReducerClass(AvgReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
題目 4: Top K 單詞統(tǒng)計
答案:通過兩階段 MapReduce 實(shí)現(xiàn):第一階段統(tǒng)計單詞頻率,第二階段從中找出頻率最高的 K 個單詞。
第一階段:統(tǒng)計單詞頻率
這部分代碼與常規(guī)單詞計數(shù)類似。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordFrequency {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = value.toString().split("\\s+");for (String token : tokens) {word.set(token);context.write(word, one);}}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word frequency");job.setJarByClass(WordFrequency.class);job.setMapperClass(TokenizerMapper.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
第二階段:提取 Top K
思路
第二階段通過輸入第一階段的輸出結(jié)果,將頻率和單詞對交換,按頻率降序排序,選出頻率最高的 K 個單詞
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.TreeMap;public class TopKWords {public static class SwapMapper extends Mapper<Object, Text, IntWritable, Text> {private IntWritable frequency = new IntWritable();private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] parts = value.toString().split("\\s+");if (parts.length == 2) {word.set(parts[0]);frequency.set(Integer.parseInt(parts[1]));context.write(frequency, word); // 倒置鍵值對,頻率作為 key}}}public static class TopKReducer extends Reducer<IntWritable, Text, Text, IntWritable> {private TreeMap<Integer, String> topKMap = new TreeMap<>();private int K = 10; // 設(shè)置需要的 Top K 值public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text val : values) {topKMap.put(key.get(), val.toString());if (topKMap.size() > K) {topKMap.remove(topKMap.firstKey()); // 保持大小為 K}}}protected void cleanup(Context context) throws IOException, InterruptedException {for (Integer freq : topKMap.descendingKeySet()) {context.write(new Text(topKMap.get(freq)), new IntWritable(freq));}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "top k words");job.setJarByClass(TopKWords.class);job.setMapperClass(SwapMapper.class);job.setReducerClass(TopKReducer.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
題目 5: 日志分析
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogAnalysis {public static class LogMapper extends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] parts = value.toString().split(" ");if (parts.length > 0) {String ip = parts[0]; // 提取 IPcontext.write(new Text(ip), one);}}}public static class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "log analysis");job.setJarByClass(LogAnalysis.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
?題目 6: 用戶購買分析
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class UserPurchaseAnalysis {public static class PurchaseMapper extends Mapper<Object, Text, Text, DoubleWritable> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] parts = value.toString().split(" ");if (parts.length == 2) {String userId = parts[0];double amount = Double.parseDouble(parts[1]);context.write(new Text(userId), new DoubleWritable(amount));}}}public static class PurchaseReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {double sum = 0.0;for (DoubleWritable val : values) {sum += val.get();}context.write(key, new DoubleWritable(sum));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "user purchase analysis");job.setJarByClass(UserPurchaseAnalysis.class);job.setMapperClass(PurchaseMapper.class);job.setReducerClass(PurchaseReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
題目 7: 倒排索引 (Inverted Index)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.HashSet;public class InvertedIndex {public static class IndexMapper extends Mapper<Object, Text, Text, Text> {private Text word = new Text();private Text documentId = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] line = value.toString().split("\t", 2); // 輸入格式: 文檔ID \t 文本內(nèi)容if (line.length < 2) return;documentId.set(line[0]);String[] words = line[1].split("\\s+");for (String w : words) {word.set(w);context.write(word, documentId);}}}public static class IndexReducer extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {HashSet<String> docIds = new HashSet<>();for (Text docId : values) {docIds.add(docId.toString());}context.write(key, new Text(String.join(", ", docIds)));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "inverted index");job.setJarByClass(InvertedIndex.class);job.setMapperClass(IndexMapper.class);job.setReducerClass(IndexReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
?題目 8: 用戶商品共現(xiàn)分析
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.ArrayList;public class CoOccurrence {public static class CoOccurrenceMapper extends Mapper<Object, Text, Text, Text> {private Text user = new Text();private Text itemList = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] parts = value.toString().split("\\s+");if (parts.length < 2) return;user.set(parts[0]);itemList.set(String.join(",", parts, 1, parts.length));context.write(user, itemList);}}public static class CoOccurrenceReducer extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {ArrayList<String> items = new ArrayList<>();for (Text val : values) {String[] parts = val.toString().split(",");for (String item : parts) {items.add(item);}}for (int i = 0; i < items.size(); i++) {for (int j = i + 1; j < items.size(); j++) {String pair = items.get(i) + " " + items.get(j);context.write(new Text(pair), new Text("1"));}}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "co-occurrence");job.setJarByClass(CoOccurrence.class);job.setMapperClass(CoOccurrenceMapper.class);job.setReducerClass(CoOccurrenceReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
?題目 9: 數(shù)據(jù)去重
???????
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Deduplication {public static class DedupMapper extends Mapper<Object, Text, Text, Text> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {context.write(value, new Text(""));}}public static class DedupReducer extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(key, new Text(""));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "data deduplication");job.setJarByClass(Deduplication.class);job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
題目 10: 分組統(tǒng)計 (Group By)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class GroupBy {public static class GroupMapper extends Mapper<Object, Text, Text, IntWritable> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] parts = value.toString().split("\\s+");if (parts.length == 2) {String group = parts[0];int number = Integer.parseInt(parts[1]);context.write(new Text(group), new IntWritable(number));}}}public static class GroupReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "group by");job.setJarByClass(GroupBy.class);job.setMapperClass(GroupMapper.class);job.setReducerClass(GroupReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}