網(wǎng)站開發(fā)編程的工作方法可以看任何網(wǎng)站的瀏覽器
文章目錄
- 什么是Flume
- Flume的特性
- Flume高級(jí)應(yīng)用場(chǎng)景
- Flume的三大核心組件
- Source:數(shù)據(jù)源
- channel
- sink
- Flume安裝部署
- Flume的使用
- 案例:采集文件內(nèi)容上傳至HDFS
- 案例:采集網(wǎng)站日志上傳至HDFS
- 各種自定義組件
- 例如:自定義source
- 例如:自定義sink
- Flume優(yōu)化
- Flume進(jìn)程監(jiān)控
什么是Flume
Flume是一個(gè)高可用,高可靠,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),能夠有效的收集、聚合、移動(dòng)大量的日志數(shù)據(jù)。其實(shí)通俗一點(diǎn)來說就是Flume是一個(gè)很靠譜,很方便、很強(qiáng)的日志采集工具。他是目前大數(shù)據(jù)領(lǐng)域數(shù)據(jù)采集最常用的一個(gè)框架。為什么它這么香呢?
主要是因?yàn)槭褂肍lume采集數(shù)據(jù)不需要寫一行代碼,注意是一行代碼都不需要,只需要在配置文件中隨便寫幾行配置Flume就會(huì)死心塌地的給你干活了
這個(gè)屬于Flume的一個(gè)非常典型的應(yīng)用場(chǎng)景,使用Flume采集數(shù)據(jù),最終存儲(chǔ)到HDFS上。
左邊的web server表示是一個(gè)web項(xiàng)目,web項(xiàng)目會(huì)產(chǎn)生日志數(shù)據(jù),通過中間的Agent把日志數(shù)據(jù)采集到HDFS中。其中這個(gè)Agent就是我們使用Flume啟動(dòng)的一個(gè)代理,它是一個(gè)持續(xù)傳輸數(shù)據(jù)的服務(wù),數(shù)據(jù)在Agent內(nèi)部的這些組件之間傳輸?shù)幕締挝皇荅vent
從圖中可以看到,Agent是由Source、Channel、Sink這三大組件組成的,這就是Flume中的三大核心組件,其中source是數(shù)據(jù)源,負(fù)責(zé)讀取數(shù)據(jù)。channel是臨時(shí)存儲(chǔ)數(shù)據(jù)的,source會(huì)把讀取到的數(shù)據(jù)臨時(shí)存儲(chǔ)到channel中。sink是負(fù)責(zé)從channel中讀取數(shù)據(jù)的,最終將數(shù)據(jù)寫出去,寫到指定的目的地中
Flume的特性
- 它有一個(gè)簡(jiǎn)單、靈活的基于流的數(shù)據(jù)流結(jié)構(gòu),這個(gè)其實(shí)就是剛才說的Agent內(nèi)部有三大組件,數(shù)據(jù)通過這三大組件流動(dòng)的
- 具有負(fù)載均衡機(jī)制和故障轉(zhuǎn)移機(jī)制
- 一個(gè)簡(jiǎn)單可擴(kuò)展的數(shù)據(jù)模型(Source、Channel、Sink),這幾個(gè)組件是可靈活組合的
Flume高級(jí)應(yīng)用場(chǎng)景
下面這個(gè)圖里面主要演示了Flume的多路輸出,就是可以將采集到的一份數(shù)據(jù)輸出到多個(gè)目的地中,不同目的地的數(shù)據(jù)對(duì)應(yīng)不同的業(yè)務(wù)場(chǎng)景
這個(gè)圖里面一共有兩個(gè)Agent,表示我們啟動(dòng)了2個(gè)Flume的代理,或者可以理解為了啟動(dòng)了2個(gè)flume的進(jìn)程。首先看左邊這個(gè)agent,給他起個(gè)名字叫 foo,這里面有一個(gè)source,source后面接了3個(gè)channel,表示source讀取到的數(shù)據(jù)會(huì)重復(fù)發(fā)送給每個(gè)channel,每個(gè)channel中的數(shù)據(jù)都是一樣的。針對(duì)每個(gè)channel都接了一個(gè)sink,這三個(gè)sink負(fù)責(zé)讀取對(duì)應(yīng)channel中的數(shù)據(jù),并且把數(shù)據(jù)輸出到不同的目的地,
sink1負(fù)責(zé)把數(shù)據(jù)寫到hdfs中
sink2負(fù)責(zé)把數(shù)據(jù)寫到一個(gè)Java消息服務(wù)數(shù)據(jù)隊(duì)列中
sink3負(fù)責(zé)把數(shù)據(jù)寫給另一個(gè)Agent
注意了,Flume中多個(gè)Agent之間是可以連通的,只需要讓前面Agent的sink組件把數(shù)據(jù)寫到下一個(gè)Agent的source組件中即可。
所以sink3就把數(shù)據(jù)輸出到了Agent bar中。在Agent bar中同樣有三個(gè)組件,source組件其實(shí)就獲取到了sink3發(fā)送過來的數(shù)據(jù),然后把數(shù)據(jù)臨時(shí)存儲(chǔ)到自己的channel4中,最終再通過sink組件把數(shù)據(jù)寫到其他地方。這就是這個(gè)場(chǎng)景的應(yīng)用,把采集到的一份數(shù)據(jù)重復(fù)輸出到不同的目的地中。
下面這張圖,這張圖主要表示了flume的匯聚功能,就是多個(gè)Agent采集到的數(shù)據(jù)統(tǒng)一匯聚到一個(gè)Agent
這個(gè)圖里面一共啟動(dòng)了四個(gè)agent,左邊的三個(gè)agent都是負(fù)責(zé)采集對(duì)應(yīng)web服務(wù)器中的日志數(shù)據(jù),數(shù)據(jù)采集過來之后統(tǒng)一發(fā)送給agent4,最后agent4進(jìn)行統(tǒng)一匯總,最終寫入hdfs。
這種架構(gòu)的好處是后期如果要修改最終數(shù)據(jù)的輸出目的地,只需要修改agent4中的sink即可,不需要修改agent1、2、3。但是這種架構(gòu)也有弊端,
- 如果有很多個(gè)agent同時(shí)向agent4寫數(shù)據(jù),那么agent4會(huì)出現(xiàn)性能瓶頸,導(dǎo)致數(shù)據(jù)處理過慢
- 這種架構(gòu)還存在單點(diǎn)故障問題,如果agent4掛了,那么所有的數(shù)據(jù)都斷了。
不過這些問題可以通過flume中的負(fù)載均衡和故障轉(zhuǎn)移機(jī)制解決
Flume的三大核心組件
- Source:數(shù)據(jù)源
- Channel:臨時(shí)存儲(chǔ)數(shù)據(jù)的管道
- Sink:目的地
接下來具體看一下這三大核心組件都是干什么的
Source:數(shù)據(jù)源
Source:數(shù)據(jù)源:通過source組件可以指定讓Flume讀取哪里的數(shù)據(jù),然后將數(shù)據(jù)傳遞給后面的channel
Flume內(nèi)置支持讀取很多種數(shù)據(jù)源,基于文件、基于目錄、基于TCP/UDP端口、基于HTTP、Kafka的等等、當(dāng)然了,如果這里面沒有你喜歡的,他也是支持自定義的
在這我們挑幾個(gè)常用的看一下:
- Exec Source:實(shí)現(xiàn)文件監(jiān)控,可以實(shí)時(shí)監(jiān)控文件中的新增內(nèi)容,類似于linux中的tail -f 效果。
在這需要注意 tail -F 和 tail -f 的區(qū)別
tail -F : 等同于–follow=name --retry,根據(jù)文件名進(jìn)行追蹤,并保持重試,即該文件被刪除或改名后,如果再次創(chuàng)建相同的文件名,會(huì)繼續(xù)追蹤
tail -f :等同于–follow=descriptor,根據(jù)文件描述符進(jìn)行追蹤,當(dāng)文件改名或被刪除,追蹤停止。在實(shí)際工作中我們的日志數(shù)據(jù)一般都會(huì)通過log4j記錄,log4j產(chǎn)生的日志文件名稱是固定的,每天定時(shí)給文件重命名
假設(shè)默認(rèn)log4j會(huì)向access.log文件中寫日志,每當(dāng)凌晨0點(diǎn)的時(shí)候,log4j都會(huì)對(duì)文件進(jìn)行重命名,在access后面添加昨天的日期,然后再創(chuàng)建新的access.log記錄當(dāng)天的新增日志數(shù)據(jù)。這個(gè)時(shí)候如果想要一直監(jiān)控access.log文件中的新增日志數(shù)據(jù)的話,就需要使用tail -F - NetCat TCP/UDP Source: 采集指定端口(tcp、udp)的數(shù)據(jù),可以讀取流經(jīng)端口的每一行數(shù)據(jù)
- Spooling Directory Source:采集文件夾里新增的文件
- Kafka Source:從Kafka消息隊(duì)列中采集數(shù)據(jù)
注意了,前面我們分析的這幾個(gè)source組件,其中execsource 和 kafkasource在實(shí)際工作中是最常見的,可以滿足大部分的數(shù)據(jù)采集需求。
channel
Channel:接受Source發(fā)出的數(shù)據(jù),可以把channel理解為一個(gè)臨時(shí)存儲(chǔ)數(shù)據(jù)的管道。
Channel的類型有很多:內(nèi)存、文件,內(nèi)存+文件、JDBC等
接下來我們來分析一下
- Memory Channel:使用內(nèi)存作為數(shù)據(jù)的存儲(chǔ)
優(yōu)點(diǎn)是效率高,因?yàn)榫筒簧婕按疟PIO
缺點(diǎn)有兩個(gè)
1:可能會(huì)丟數(shù)據(jù),如果Flume的agent掛了,那么channel中的數(shù)據(jù)就丟失了。
2:內(nèi)存是有限的,會(huì)存在內(nèi)存不夠用的情況 - File Channel:使用文件來作為數(shù)據(jù)的存儲(chǔ)
優(yōu)點(diǎn)是數(shù)據(jù)不會(huì)丟失
缺點(diǎn)是效率相對(duì)內(nèi)存來說會(huì)有點(diǎn)慢,但是這個(gè)慢并沒有我們想象中的那么慢,
所以這個(gè)也是比較常用的一種channel。 - Spillable Memory Channel:使用內(nèi)存和文件作為數(shù)據(jù)存儲(chǔ),即先把數(shù)據(jù)存到內(nèi)存中,如果內(nèi)存中數(shù)據(jù)達(dá)到閾值再flush到文件中
優(yōu)點(diǎn):解決了內(nèi)存不夠用的問題。
缺點(diǎn):還是存在數(shù)據(jù)丟失的風(fēng)險(xiǎn)
sink
Sink:從Channel中讀取數(shù)據(jù)并存儲(chǔ)到指定目的地
Sink的表現(xiàn)形式有很多:打印到控制臺(tái)、HDFS、Kafka等,
注意:Channel中的數(shù)據(jù)直到進(jìn)入目的地才會(huì)被刪除,當(dāng)Sink寫入目的地失敗后,可以自動(dòng)重寫,
不會(huì)造成數(shù)據(jù)丟失,這塊是有一個(gè)事務(wù)保證的。
常用的sink組件有:
- Logger Sink:將數(shù)據(jù)作為日志處理,可以選擇打印到控制臺(tái)或者寫到文件中,這個(gè)主要在測(cè)試的時(shí)候使用
- HDFS Sink:將數(shù)據(jù)傳輸?shù)紿DFS中,這個(gè)是比較常見的,主要針對(duì)離線計(jì)算的場(chǎng)景
- Kafka Sink:將數(shù)據(jù)發(fā)送到kafka消息隊(duì)列中,這個(gè)也是比較常見的,主要針對(duì)實(shí)時(shí)計(jì)算場(chǎng)景,數(shù)據(jù)不落盤,實(shí)時(shí)傳輸,最后使用實(shí)時(shí)計(jì)算框架直接處理。
Flume安裝部署
在這里我重新克隆了一臺(tái)Linux機(jī)器,主機(jī)名設(shè)置為bigdata04,ip設(shè)置為192.168.182.103
關(guān)閉防火墻,安裝jdk并配置環(huán)境變量,因?yàn)镕lume是java開發(fā),所以需要依賴jdk環(huán)境。這些工作已經(jīng)提前做好了,繼續(xù)往下面分析
想要安裝Flume,首先需要下載Flume,進(jìn)入Flume的官網(wǎng),找到Download鏈接
安裝包下載好以后上傳到linux機(jī)器的/data/soft目錄下,并且解壓
[root@bigdata04 soft]# ll
total 255844
-rw-r--r--. 1 root root 67938106 May 1 23:27 apache-flume-1.9.0-bin.tar.gz
drwxr-xr-x. 7 10 143 245 Dec 16 2018 jdk1.8
-rw-r--r--. 1 root root 194042837 Apr 6 23:14 jdk-8u202-linux-x64.tar.gz
[root@bigdata04 soft]# tar -zxvf apache-flume-1.9.0-bin.tar.gz
修改盤flume的env環(huán)境變量配置文件
在flume的conf目錄下,修改flume-env.sh.template的名字,去掉后綴template
[root@bigdata04 conf]# mv flume-env.sh.template flume-env.sh
這樣就好了,Flume的安裝是不是很簡(jiǎn)單,這個(gè)時(shí)候我們不需要啟動(dòng)任何進(jìn)程,只有在配置好采集任務(wù)之后才需要啟動(dòng)Flume。
Flume的使用
下面我們就想上手操作Flume,具體該怎么做呢?
先來看一個(gè)入門級(jí)別的Hello World案例。
我們前面說了,啟動(dòng)Flume任務(wù)其實(shí)就是啟動(dòng)一個(gè)Agent,Agent是由source、channel、sink組成的,這些組件在使用的時(shí)候只需要寫幾行配置就可以了
那下面我們就看一下source、channel、sink該如何配置呢?接下來帶著大家看一下官網(wǎng),找到左邊的documentation,查看文檔信息
Flume的操作文檔是非常良心的,整理的非常詳細(xì)
下面有一個(gè)Agent配置的例子:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
這個(gè)例子中首先定義了source的名字、sink的名字還有channel的名字
下面配置source的相關(guān)參數(shù)
下面配置了sink的相關(guān)參數(shù)
接著配置了channel的相關(guān)參數(shù)
最后把這三個(gè)組件連接到了一起,就是告訴source需要向哪個(gè)channel寫入數(shù)據(jù),告訴sink需要從哪個(gè)channel讀取數(shù)據(jù),這樣source、channel、sink這三個(gè)組件就聯(lián)通了。總結(jié)下來,配置Flume agent的主要流程是這樣的
- 給每個(gè)組件起名字
- 配置每個(gè)組件的相關(guān)參數(shù)
- 把它們聯(lián)通起來
注意了,在Agent中配置的三大組件為什么要這樣寫呢?如果我是第一次使用我也不會(huì)寫啊。
三大組件的配置在文檔中是有詳細(xì)說明的,來看一下,在Flume Sources下面顯示的都是已經(jīng)內(nèi)置支持的Source組件
剛才看的案例中使用的是source類型是netcat,其實(shí)就是NetCat TCP Source,看一下詳細(xì)內(nèi)容
這里面的粗體字體是必選的參數(shù)
第一個(gè)參數(shù)是為了指定source需要向哪個(gè)channel寫數(shù)據(jù),這個(gè)其實(shí)是通用的參數(shù),主要看下面這三個(gè),type、bind、port
- type:類型需要指定為natcat
- bind:指定當(dāng)前機(jī)器的ip,使用hostname也可以
- port:指定當(dāng)前機(jī)器中一個(gè)沒有被使用的端口
指定bind和port表示開啟監(jiān)聽模式,監(jiān)聽指定ip和端口中的數(shù)據(jù),其實(shí)就是開啟了一個(gè)socket的服務(wù)端,等待客戶端連接進(jìn)來寫入數(shù)據(jù)
在這里給agent起名為a1,所以netcat類型的配置如下,這里面還指定了source、channel的名字,并且把source和channel連接到一起了,刨除這幾個(gè)配置之外就剩下了三行配置,就是剛才我們分析的那三個(gè)必填參數(shù)
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
注意了,bind參數(shù)后面指定的ip是四個(gè)0,這個(gè)當(dāng)前機(jī)器的通用ip,因?yàn)橐慌_(tái)機(jī)器可以有多個(gè)ip,例如:內(nèi)網(wǎng)ip、外網(wǎng)ip,如果通過bind參數(shù)指定某一個(gè)ip的話,表示就只監(jiān)聽通過這個(gè)ip發(fā)送過來的數(shù)據(jù)了,這樣會(huì)有局限性,所以可以指定0.0.0.0。下面幾個(gè)參數(shù)都是可選配置,默認(rèn)可以不配置。接著是channel,案例中channel使用的是memory
查看memory channel
這里面只有type是必填項(xiàng),其他都是可選的
最后看一下sink,在案例中sink使用的是logger,對(duì)應(yīng)的就是Logger Sink
logger sink中默認(rèn)也只需要指定type即可
后期我們?nèi)绻胍褂闷渌膬?nèi)置組件,直接到官網(wǎng)文檔這里查找即可,這里面的配置有很多,沒有必要去記,肯定記不住,只要知道到哪里去找就可以。配置文件分析完了,可以把這些配置放到一個(gè)配置文件中,起名叫example.conf,把這個(gè)配置文件放到
[root@bigdata04 ~]# cd /data/soft/apache-flume-1.9.0-bin
[root@bigdata04 apache-flume-1.9.0-bin]# cd conf/
[root@bigdata04 conf]# vi example.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意了,這個(gè)配置文件中的a1表示是agent的名稱,還有就是port指定的端口必須是未被使用的,可以先查詢一下當(dāng)前機(jī)器使用了哪些端口,端口的可用范圍是1-65535,如果懶得去查的話,就盡量使用偏大一些的端口,這樣被占用的概率就非常低了。
Agent配置好了以后就可以啟動(dòng)了,下面來看一下啟動(dòng)Agent的命令
可以使用命令:
bin/flume-ng agent --name a1 --conf conf --conf-file example.conf -Dflume.ro
這里面使用flume-ng命令
后面指定agent,表示啟動(dòng)一個(gè)Flume的agent代理
--name:指定agent的名字
--conf:指定flume配置文件的根目錄
--conf-file:指定Agent對(duì)應(yīng)的配置文件(包含source、channel、sink配置的文件)
-D:動(dòng)態(tài)添加一些參數(shù),在這里是指定了flume的日志輸出級(jí)別和輸出位置,INFO表示日志級(jí)
其實(shí)agent的啟動(dòng)命令還可以這樣寫
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
這里面的-n屬于簡(jiǎn)寫,完整的寫法就是–name
-c完整寫法的–conf
-f完整寫法是–conf-file
注意了,由于配置文件里面指定了agent的名稱為a1,所以在–name后面也需要指定a1,還有就是通過–conf-file指定配置文件的時(shí)候需要指定conf目錄下的example.conf配置文件
啟動(dòng)之后會(huì)看到如下信息,表示啟動(dòng)成功,啟動(dòng)成功之后,這個(gè)窗口會(huì)被一直占用,因?yàn)锳gent服務(wù)一直在運(yùn)行,現(xiàn)在屬于一個(gè)前臺(tái)進(jìn)程。
2020-05-02 10:14:56,464 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.so
如果看到提示的有ERROR級(jí)別的日志信息,就需要具體問題具體分析了,一般都是配置文件配置錯(cuò)誤了。接下來我們需要連接到source中通過netcat開啟的socket服務(wù)端克隆一個(gè)bigdata04的會(huì)話,因?yàn)榍懊鎲?dòng)Agent之后,窗口就被占用了使用telnet命令可以連接到指定socket服務(wù),telnet后面的主機(jī)名和端口是根據(jù)example.conf配置文件中配置的
[root@bigdata04 ~]# telnet localhost 44444
-bash: telnet: command not found
[root@bigdata04 ~]# yum install -y telnet
[root@bigdata04 ~]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello world!
OK
此時(shí)Flume中Agent服務(wù)是在前臺(tái)運(yùn)行,這個(gè)服務(wù)實(shí)際工作中需要一直運(yùn)行,所以需要放到后臺(tái)運(yùn)行。
Flume自身沒有提供直接把進(jìn)程放到后臺(tái)執(zhí)行的參數(shù),所以就需要使用咱們前面學(xué)習(xí)的nohup和&了。此時(shí)就不需要指定-Dflume.root.logger=INFO,console參數(shù)了,默認(rèn)情況下flume的日志會(huì)記錄到日志文件中。停掉之前的Agent,重新執(zhí)行。
[root@bigdata04 apache-flume-1.9.0-bin]# nohup bin/flume-ng agent --name a1 -
啟動(dòng)之后,通過jps命令可以查看到一個(gè)application進(jìn)程,這個(gè)就是啟動(dòng)的Agent
案例:采集文件內(nèi)容上傳至HDFS
接下來我們來看一個(gè)工作中的典型案例:
采集文件內(nèi)容上傳至HDFS
需求:采集目錄中已有的文件內(nèi)容,存儲(chǔ)到HDFS
分析:source是要基于目錄的,channel建議使用file,可以保證不丟數(shù)據(jù),sink使用hdfs
下面要做的就是配置Agent了,可以把example.conf拿過來修改一下,新的文件名為file-to-hdfs.conf
首先是基于目錄的source,咱們前面說過,Spooling Directory Source可以實(shí)現(xiàn)目錄監(jiān)控。來看一下這個(gè)Spooling Directory Source。
channels和type肯定是必填的,還有一個(gè)是spoolDir,就是指定一個(gè)監(jiān)控的目錄
看他下面的案例,里面還多指定了一個(gè)fileHeader,這個(gè)我們暫時(shí)也用不到,后面等我們講了Event之后大家就知道這個(gè)fileHeader可以干什么了,先記著有這個(gè)事把。那來配置一下source
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/studentDir
接下來是channel了
channel在這里使用基于文件的,可以保證數(shù)據(jù)的安全性
如果針對(duì)采集的數(shù)據(jù),丟個(gè)一兩條對(duì)整體結(jié)果影響不大,只要求采集效率,那么這個(gè)時(shí)候完全可以使用基于內(nèi)存的channel
咱們前面的例子中使用的是基于內(nèi)存的channel,下面我們到文檔中找一下基于文件的channel
根據(jù)這里的例子可知,主要配置checkpointDir和dataDir,因?yàn)檫@兩個(gè)目錄默認(rèn)會(huì)在用戶家目錄下生成,
建議修改到其他地方
- checkpointDir是存放檢查點(diǎn)目錄
- data是存放數(shù)據(jù)的目錄
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/studentDir
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/studentDir/d
最后是sink
因?yàn)橐騢dfs中輸出數(shù)據(jù),所以可以使用hdfssink
hdfs.path是必填項(xiàng),指定hdfs上的存儲(chǔ)目錄
看這里例子中還指定了filePrefix參數(shù),這個(gè)是一個(gè)文件前綴,會(huì)在hdfs上生成的文件前面加上這個(gè)前綴,這個(gè)屬于可選項(xiàng),有需求的話可以加上一般在這我們需要設(shè)置writeFormat和fileType這兩個(gè)參數(shù)
默認(rèn)情況下writeFormat的值是Writable,建議改為Text,看后面的解釋,如果后期想使用hive或者impala操作這份數(shù)據(jù)的話,必須在生成數(shù)據(jù)之前設(shè)置為Text,Text表示是普通文本數(shù)據(jù)
fileType默認(rèn)是SequenceFile,還支持DataStream 和 CompressedStream ,DataStream 不會(huì)對(duì)輸出數(shù)據(jù)進(jìn)行壓縮,CompressedStream 會(huì)對(duì)輸出數(shù)據(jù)進(jìn)行壓縮,在這里我們先不使用壓縮格式的,所以選擇DataStream
除了這些參數(shù)以外,還有三個(gè)也比較重要hdfs.rollInterval、hdfs.rollSize和hdfs.rollCount
- hdfs.rollInterval默認(rèn)值是30,單位是秒,表示hdfs多長(zhǎng)時(shí)間切分一個(gè)文件,因?yàn)檫@個(gè)采集程序是一直運(yùn)行的,只要有新數(shù)據(jù),就會(huì)被采集到hdfs上面,hdfs默認(rèn)30秒鐘切分出來一個(gè)文件,如果設(shè)置為0表示不按時(shí)間切文件
- hdfs.rollSize默認(rèn)是1024,單位是字節(jié),最終hdfs上切出來的文件大小都是1024字節(jié),如果設(shè)置為0表示不按大小切文件
- hdfs.rollCount默認(rèn)設(shè)置為10,表示每隔10條數(shù)據(jù)切出來一個(gè)文件,如果設(shè)置為0表示不按數(shù)據(jù)條數(shù)切文件這三個(gè)參數(shù),如果都設(shè)置的有值,哪個(gè)條件先滿足就按照哪個(gè)條件都會(huì)執(zhí)行。在實(shí)際工作中一般會(huì)根據(jù)時(shí)間或者文件大小來切分文件,我們之前在工作中是設(shè)置的時(shí)間和文件小相結(jié)合,時(shí)間設(shè)置的是一小時(shí),文件大小設(shè)置的128M,這兩個(gè)哪個(gè)滿足執(zhí)行哪個(gè)所以針對(duì)hdfssink的配置最終是這樣的
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/flume/studentDir
a1.sinks.k1.hdfs.filePrefix = stu-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
最后把組件連接到一起
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
把Agent的配置保存到flume的conf目錄下的 file-to-hdfs.conf 文件中:
[root@bigdata04 conf]# vi file-to-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/studentDir
# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/student
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/studentDir/d
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/flume/studentDir
a1.sinks.k1.hdfs.filePrefix = stu-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
下面就可以啟動(dòng)agent了,在啟動(dòng)agent之前,先初始化一下測(cè)試數(shù)據(jù)
創(chuàng)建/data/log/studentDir目錄,然后在里面添加一個(gè)文件,class1.dat
class1.dat中存儲(chǔ)的是學(xué)生信息,學(xué)生姓名、年齡、性別
[root@bigdata04 ~]# mkdir -p /data/log/studentDir
[root@bigdata04 ~]# cd /data/log/studentDir
[root@bigdata04 studentDir]# more class1.dat
jack 18 male
jessic 20 female
tom 17 male
啟動(dòng)Hadoop集群
啟動(dòng)Agent,使用在前臺(tái)啟動(dòng)的方式,方便觀察現(xiàn)象
Flume怎么知道哪些文件是新文件呢?它會(huì)不會(huì)重復(fù)讀取同一個(gè)文件的數(shù)據(jù)呢?
不會(huì)的,我們到/data/log/studentDir目錄看一下你就知道了
我們發(fā)現(xiàn)此時(shí)這個(gè)文件已經(jīng)被加了一個(gè)后綴 .COMPLETED ,表示這個(gè)文件已經(jīng)被讀取過了,所以Flume在讀取的時(shí)候會(huì)忽略后綴為 .COMPLETED 的文件。
案例:采集網(wǎng)站日志上傳至HDFS
需求是這樣的,
- 將A和B兩臺(tái)機(jī)器實(shí)時(shí)產(chǎn)生的日志數(shù)據(jù)匯總到機(jī)器C中
- 通過機(jī)器C將數(shù)據(jù)統(tǒng)一上傳至HDFS的指定目錄中
注意:HDFS中的目錄是按天生成的,每天一個(gè)目錄
根據(jù)剛才的需求分析可知,我們一共需要三臺(tái)機(jī)器
這里使用bigdata02和bigdata03采集當(dāng)前機(jī)器上產(chǎn)生的實(shí)時(shí)日志數(shù)據(jù),統(tǒng)一匯總到bigdata04機(jī)器上。其中bigdata02和bigdata03中的source使用基于file的source,ExecSource,因?yàn)橐獙?shí)時(shí)讀取文件中的新增數(shù)據(jù)
channel在這里我們使用基于內(nèi)存的channel,因?yàn)檫@里是采集網(wǎng)站的訪問日志,就算丟一兩條數(shù)據(jù)對(duì)整體結(jié)果影響也不大,我們只希望采集到的數(shù)據(jù)可以快讀進(jìn)入hdfs中,所以就選擇了基于內(nèi)存的channel。
由于bigdata02和bigdata03的數(shù)據(jù)需要快速發(fā)送到bigdata04中,為了快速發(fā)送我們可以通過網(wǎng)絡(luò)直接傳輸,sink建議使用avrosink,avro是一種數(shù)據(jù)序列化系統(tǒng),經(jīng)過它序列化的數(shù)據(jù)傳輸起來效率更高,并且它對(duì)應(yīng)的還有一個(gè)avrosource,avrosink的數(shù)據(jù)可以直接發(fā)送給avrosource,所以他們可以無縫銜接。
這樣bigdata04的source就確定了 使用avrosource、channel還是基于內(nèi)存的channel,sink就使用
hdfssink,因?yàn)槭且騢dfs中寫數(shù)據(jù)的。
這里面的組件,只有execsource、avrosource、avrosink我們還沒有使用過,其他的組件都使用過了。最終需要在每臺(tái)機(jī)器上啟動(dòng)一個(gè)agent,啟動(dòng)的時(shí)候需要注意先后順序,先啟動(dòng)bigdata04上面的,再啟動(dòng)bigdata02和bigdata03上面的。
具體實(shí)現(xiàn)這個(gè)案例
1:在bigdata02上安裝Flume并配置Agent
上傳Flume的安裝包,解壓
[root@bigdata02 soft]# tar -zxvf apache-flume-1.9.0-bin.tar.gz
在flume的conf目錄下,修改flume-env.sh.template的名字,去掉后綴template
[root@bigdata02 soft]# cd apache-flume-1.9.0-bin/conf
[root@bigdata02 conf]# mv flume-env.sh.template flume-env.sh
配置Agent,創(chuàng)建文件 file-to-avro-101.conf
[root@bigdata02 conf] vi file-to-avro-101.conf
# agent的名稱是a1
# 指定source組件、channel組件和Sink組件的名稱
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置source組件
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/access.log# 配置channel組件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink組件
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.182.103
a1.sinks.k1.port = 45454
# 把組件連接起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
這里面的配置沒有特殊配置,直接參考官網(wǎng)文檔就可以搞定
2:在bigdata03上安裝Flume并配置Agent
上傳Flume的安裝包,解壓
配置Agent,創(chuàng)建文件file-to-avro-102.conf
[root@bigdata03 conf] vi file-to-avro-102.conf
# agent的名稱是a1
# 指定source組件、channel組件和Sink組件的名稱
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置source組件
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/access.log
# 配置channel組件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink組件
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.182.103
a1.sinks.k1.port = 45454
# 把組件連接起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3:在bigdata04上安裝Flume并配置Agent
這臺(tái)機(jī)器我們已經(jīng)安裝過Flume了,所以直接配置Agent即可
在指定Agent中sink配置的時(shí)候注意,我們的需求是需要按天在hdfs中創(chuàng)建目錄,并把當(dāng)天的數(shù)據(jù)上傳到當(dāng)天的日期目錄中,這也就意味著hdfssink中的path不能寫死,需要使用變量,動(dòng)態(tài)獲取時(shí)間,查看官方文檔可知,在hdfs的目錄中需要使用%Y%m%d
在這還有一點(diǎn)需要注意的,因?yàn)槲覀冞@里需要抽取時(shí)間,這個(gè)時(shí)間其實(shí)是需要從數(shù)據(jù)里面抽取,咱們前面說過數(shù)據(jù)的基本單位是Event,Event是一個(gè)對(duì)象,后面我們會(huì)詳細(xì)分析,在這里大家先知道它里面包含的既有我們采集到的原始的數(shù)據(jù),還有一個(gè)header屬性,這個(gè)header屬性是一個(gè)key-value結(jié)構(gòu)的,我們現(xiàn)在抽取時(shí)間就需要到event的header中抽取,但是默認(rèn)情況下event的header中是沒有日期的,強(qiáng)行抽取是會(huì)報(bào)錯(cuò)的,會(huì)提示抽取不到,返回空指針異常。
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
那如何向header中添加日期呢? 其實(shí)官方文檔中也說了,可以使用hdfs.useLocalTimeStamp或者時(shí)間攔截器,時(shí)間攔截器我們后面會(huì)講,暫時(shí)最簡(jiǎn)單直接的方式就是使用hdfs.useLocalTimeStamp,這個(gè)屬性的值默認(rèn)為false,需要改為true。
配置Agent,創(chuàng)建文件 avro-to-hdfs.conf
[root@bigdata04 conf] vi avro-to-hdfs.conf
# agent的名稱是a1
# 指定source組件、channel組件和Sink組件的名稱
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置source組件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 45454
# 配置channel組件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink組件
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/access/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = access
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 把組件連接起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意:bigdata02和bigdata03中配置的a1.sinks.k1.port 的值45454需要和bigdata04中配置的三臺(tái)機(jī)器中的Flume Agent都配置好了,在開始啟動(dòng)之前需要先在bigdata02和bigdata03中生成測(cè)試數(shù)據(jù),為了模擬真實(shí)情況,在這里我們就開發(fā)一個(gè)腳本,定時(shí)向文件中寫數(shù)據(jù)
#!/bin/bash
# 循環(huán)向文件中生成數(shù)據(jù)
while [ "1" = "1" ]
do
# 獲取當(dāng)前時(shí)間戳
curr_time=`date +%s`
# 獲取當(dāng)前主機(jī)名
name=`hostname`
echo ${name}_${curr_time} >> /data/log/access.log
# 暫停1秒
sleep 1
done
在bigdata02和bigdata03中使用這個(gè)腳本生成數(shù)據(jù)
首先在bigdata02上創(chuàng)建/data/log目錄,然后創(chuàng)建 generateAccessLog.sh 腳本
[root@bigdata02 ~]# mkdir -p /data/log
[root@bigdata02 ~]# cd /data/log/
[root@bigdata02 log]# vi generateAccessLog.sh
#!/bin/bash
# 循環(huán)向文件中生成數(shù)據(jù)
while [ "1" = "1" ]
do
# 獲取當(dāng)前時(shí)間戳
curr_time=`date +%s`
# 獲取當(dāng)前主機(jī)名
name=`hostname`
echo ${name}_${curr_time} >> /data/log/access.log
# 暫停1秒
sleep 1
done
接著在bigdata03上創(chuàng)建/data/log目錄,然后創(chuàng)建 generateAccessLog.sh 腳本
[root@bigdata03 ~]# mkdir /data/log
[root@bigdata03 ~]# cd /data/log/
[root@bigdata03 log]# vi generateAccessLog.sh
#!/bin/bash
# 循環(huán)向文件中生成數(shù)據(jù)
while [ "1" = "1" ]
do
# 獲取當(dāng)前時(shí)間戳
curr_time=`date +%s`
# 獲取當(dāng)前主機(jī)名
name=`hostname`
echo ${name}_${curr_time} >> /data/log/access.log
# 暫停1秒
sleep 1
done
接下來開始啟動(dòng)相關(guān)的服務(wù)進(jìn)程
首先啟動(dòng)bigdata04上的agent服務(wù)
接下來啟動(dòng)bigdata-02上的agent服務(wù)和shell腳本
[root@bigdata02 apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf
[root@bigdata02 log]# sh -x generateAccessLog.sh
最后啟動(dòng)bigdata-03上的agent服務(wù)和shell腳本
[root@bigdata03 apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf
[root@bigdata03 log]# sh -x generateAccessLog.sh
驗(yàn)證結(jié)果,查看hdfs上的結(jié)果數(shù)據(jù),在bigdata01上查看
[root@bigdata01 soft]# hdfs dfs -cat /access/20200502/access.1588426157482.tmp
bigdata02_1588426253
bigdata02_1588426254
bigdata02_1588426255
bigdata02_1588426256
bigdata02_1588426257
bigdata02_1588426258
注意:啟動(dòng)之后稍等一會(huì)就可以看到數(shù)據(jù)了,我們觀察數(shù)據(jù)的變化,會(huì)發(fā)現(xiàn)hdfs中數(shù)據(jù)增長(zhǎng)的不是很快,它會(huì)每隔一段時(shí)間添加一批數(shù)據(jù),實(shí)時(shí)性好像沒那么高?
這是因?yàn)閍vrosink中有一個(gè)配置batch-size,它的默認(rèn)值是100,也就是每次發(fā)送100條數(shù)據(jù),如果數(shù)據(jù)不夠100條,則不發(fā)送。
具體這個(gè)值設(shè)置多少合適,要看你source數(shù)據(jù)源大致每秒產(chǎn)生多少數(shù)據(jù),以及你希望的延遲要達(dá)到什么程度,如果這個(gè)值設(shè)置太小的話,會(huì)造成sink頻繁向外面寫數(shù)據(jù),這樣也會(huì)影響性能。最終,依次停止bigdata02、bigdata03中的服務(wù),最后停止bigdata04中的服務(wù)
各種自定義組件
咱們前面講了很多組件,有核心組件和高級(jí)組件
source、channel、sink以及Source Interceptors,Channel Selectors、Sink Processors
針對(duì)這些組件,Flume都內(nèi)置提供了組件的很多具體實(shí)現(xiàn),在實(shí)際工作中,95%以上的數(shù)據(jù)采集需求都是可以滿足的,但是誰也不敢保證100%都能滿足,因?yàn)槭裁雌孑獾男枨蠖紩?huì)有,那針對(duì)系統(tǒng)內(nèi)沒有提供的一些組件怎么辦呢?
假設(shè)我們想把flume采集到的數(shù)據(jù)輸出到mysql中,那這個(gè)時(shí)候就需要有針對(duì)mysql的sink組件了,但是Flume中并沒有,因?yàn)檫@種需求不常見,往mysql中寫的都是結(jié)構(gòu)化數(shù)據(jù),數(shù)據(jù)的格式是固定的,但是flume采集的一般都是日志數(shù)據(jù),這種屬于非結(jié)構(gòu)化數(shù)據(jù),不支持也是正常的,但是我們?cè)谶@里就是需要使用Flume往mysql中寫數(shù)據(jù),那怎么辦?
要不我們考慮換一個(gè)采集工具把,當(dāng)然這也是一種解決方案,如果有其他采集工具支持向mysql中寫數(shù)據(jù)的話那可以考慮換一個(gè)采集工具,如果所有的采集工具都不支持向mysql中寫數(shù)據(jù)呢,也就是說你這個(gè)需求就是前無古人后無來者的,怎么破?
不用擔(dān)心,天無絕人之路,其實(shí)咱們使用的Flume提供的那些內(nèi)置組件也都是作者一行代碼一行代碼寫出來的,那我們是不是也可以自己寫一個(gè)自定義的組件呢?可以的,并且flume也很歡迎你這樣去做,它把開發(fā)文檔什么的東西都給你準(zhǔn)備好了。
注意了,就算沒有文檔,我們也要想辦法去自定義,沒有文檔的話就需要去摳Flume的源碼了。
在這里Flume針對(duì)自定義組件提供了詳細(xì)的文檔說明,我們來看一下通過Flume User Guide可以看到,針對(duì)source、channle、sink、Source Interceptors,Channel Selectors、都是可以的,這里面都顯示了針對(duì)自定義的組件如何配置使用Sink Processors目前暫時(shí)不支持自定義。
那這些支持自定義的組件具體開發(fā)步驟是什么樣的呢?代碼該寫成什么樣的呢?大家還記得Flume有兩個(gè)文檔鏈接嗎?Flume Developer Guide
例如:自定義source
例如:自定義sink
自定義channel的內(nèi)容目前還沒完善,如果你確實(shí)想自定義這個(gè)組件,就需要到Flume源碼中找到目前支持的那些channel的代碼,參考著實(shí)現(xiàn)我們自定義的channel組件。
大家在這里知道可以自定義,并且知道自定義組件的文檔在哪里就可以了,目前來說,需要我們自定義組件的場(chǎng)景實(shí)在是太少了,幾乎和買彩票中獎(jiǎng)的概率差不多。
前面我們掌握了Flume的基本使用和高級(jí)使用場(chǎng)景,下面我們來看一下針對(duì)Flume的一些企業(yè)級(jí)優(yōu)化和監(jiān)控手段
Flume優(yōu)化
- 調(diào)整Flume進(jìn)程的內(nèi)存大小,建議設(shè)置1G~2G,太小的話會(huì)導(dǎo)致頻繁GC
因?yàn)镕lume進(jìn)程也是基于Java的,所以就涉及到進(jìn)程的內(nèi)存設(shè)置,一般建議啟動(dòng)的單個(gè)Flume進(jìn)程(或者說單個(gè)Agent)內(nèi)存設(shè)置為1G~2G,內(nèi)存太小的話會(huì)頻繁GC,影響Agent的執(zhí)行效率。
那具體設(shè)置多少合適呢?
這個(gè)需求需要根據(jù)Agent讀取的數(shù)據(jù)量的大小和速度有關(guān)系,所以需要具體情況具體分析,當(dāng)Flume的Agent啟動(dòng)之后,對(duì)應(yīng)就會(huì)啟動(dòng)一個(gè)進(jìn)程,我們可以通過jstat -gcutil PID 1000來看看這個(gè)進(jìn)程GC的信息,每一秒鐘刷新一次,如果GC次數(shù)增長(zhǎng)過快,說明內(nèi)存不夠用。使用jps查看目前啟動(dòng)flume進(jìn)程
[root@bigdata04 ~]# jps
2957 Jps
2799 Application
執(zhí)行 jstat -gcutil PID 1000
[root@bigdata04 ~]# jstat -gcutil 2799 1000
S0 S1 E O M CCS YGC YGCT FGC FGCT GCT
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
100.00 0.00 17.54 42.80 96.46 92.38 8 0.029 0 0.000 0
在這里主要看YGC YGCT FGC FGCT GCT
- YGC:表示新生代堆內(nèi)存GC的次數(shù),如果每隔幾十秒產(chǎn)生一次,也還可以接受,如果每秒都會(huì)發(fā)生一次YGC,那說明需要增加內(nèi)存了
- YGCT:表示新生代堆內(nèi)存GC消耗的總時(shí)間
- FGC:FULL GC發(fā)生的次數(shù),注意,如果發(fā)生FUCC GC,則Flume進(jìn)程會(huì)進(jìn)入暫停狀態(tài),FUCC GC執(zhí)行完以后Flume才會(huì)繼續(xù)工作,所以FUCC GC是非常影響效率的,這個(gè)指標(biāo)的值越低越好,沒有更好。
- GCT:所有類型的GC消耗的總時(shí)間
如果需要調(diào)整Flume進(jìn)程內(nèi)存的話,需要調(diào)整 flume-env.s h腳本中的 JAVA_OPTS 參數(shù)把 export JAVA_OPTS 參數(shù)前面的#號(hào)去掉才會(huì)生效。
export JAVA_OPTS="-Xms1024m -Xmx1024m -Dcom.sun.management.jmxremote"
建議這里的 Xms 和 Xmx 設(shè)置為一樣大,避免進(jìn)行內(nèi)存交換,內(nèi)存交換也比較消耗性能。
- 在一臺(tái)服務(wù)器啟動(dòng)多個(gè)agent的時(shí)候,建議修改配置區(qū)分日志文件
因?yàn)樵赾onf目錄下有l(wèi)og4j.properties,在這里面指定了日志文件的名稱和位置,所有使用conf目錄下面配置啟動(dòng)的Agent產(chǎn)生的日志都會(huì)記錄到同一個(gè)日志文件中,如果我們?cè)谝慌_(tái)機(jī)器上啟動(dòng)了10幾個(gè)Agent,后期發(fā)現(xiàn)某一個(gè)Agent掛了,想要查看日志分析問題,這個(gè)時(shí)候就瘋了,因?yàn)樗蠥gent產(chǎn)生的日志都混到一塊了,壓根都沒法分析日志了。
所以建議拷貝多個(gè)conf目錄,然后修改對(duì)應(yīng)conf目錄中l(wèi)og4j.properties日志的文件名稱(可以保證多個(gè)agent的日志分別存儲(chǔ)),并且把日志級(jí)別調(diào)整為warn(減少垃圾日志的產(chǎn)生),默認(rèn)info級(jí)別會(huì)記錄很多日志信息。這樣在啟動(dòng)Agent的時(shí)候分別通過–conf參數(shù)指定不同的conf目錄,后期分析日志就方便了,每一個(gè)Agent都有一個(gè)單獨(dú)的日志文件。
以bigdata04機(jī)器為例:
復(fù)制conf-failover目錄,以后啟動(dòng)sink的failover任務(wù)的時(shí)候使用這個(gè)目錄
修改 log4j.properties中的日志記錄級(jí)別和日志文件名稱,日志文件目錄可以不用修改,統(tǒng)一使用logs目錄即可。
[root@bigdata04 apache-flume-1.9.0-bin]# cp -r conf/ conf-failover
[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-failover/
[root@bigdata04 conf-failover]# vi log4j.properties
.....
flume.root.logger=WARN,LOGFILE
flume.log.dir=./logs
flume.log.file=flume-failover.log
再啟動(dòng)的時(shí)候就是這樣的了
[root@bigdata04 apache-flume-1.9.0-bin]# nohup bin/flume-ng agent --name a1 -
這樣就會(huì)在flume的logs目錄中產(chǎn)生 flume-failover.log 文件,并且文件中只記錄WARN和ERROR級(jí)別
的日志,這樣后期排查日志就很清晰了。
[root@bigdata04 apache-flume-1.9.0-bin]# cd logs/
[root@bigdata04 logs]# ll
total 4
-rw-r--r--. 1 root root 478 May 3 16:25 flume-failover.log
[root@bigdata04 logs]# more flume-failover.log
03 May 2020 16:25:38,992 ERROR [SinkRunner-PollingRunner-FailoverSinkP
rocessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unabl
e to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: All sinks failed to process,
nothing left to failover to
at org.apache.flume.sink.FailoverSinkProcessor.process(Failove
rSinkProcessor.java:194)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.ja
va:145)
at java.lang.Thread.run(Thread.java:748)
Flume進(jìn)程監(jiān)控
Flume的Agent服務(wù)是一個(gè)獨(dú)立的進(jìn)程,假設(shè)我們使用source->channel->sink實(shí)現(xiàn)了一個(gè)數(shù)據(jù)采集落盤的功能,如果這個(gè)采集進(jìn)程被誤操作干掉了,這個(gè)時(shí)候我們是發(fā)現(xiàn)不了的,什么時(shí)候會(huì)發(fā)現(xiàn)呢?
可能第二天,產(chǎn)品經(jīng)理找到你了,說昨天的這個(gè)指標(biāo)值有點(diǎn)偏低啊,你來看下怎么回事,然后你就一頓操作猛如虎,結(jié)果發(fā)現(xiàn)原始數(shù)據(jù)少了一半多,那是因?yàn)镕lume的采集程序在昨天下午的時(shí)候被誤操作干掉了。
找到問題之后,你就苦巴巴的手工去補(bǔ)數(shù)據(jù),重跑計(jì)算程序,最后再找產(chǎn)品經(jīng)理確認(rèn)數(shù)據(jù)的準(zhǔn)確性。類似的問題會(huì)有很多,這說明你現(xiàn)在是無法掌控你手下的這些程序,他們都是不受控的狀態(tài),說不定哪天哪個(gè)程序不高興,他就自殺了,不干活了,過了好幾天,需要用到這個(gè)數(shù)據(jù)的時(shí)候你才發(fā)現(xiàn),發(fā)現(xiàn)的早的話還能補(bǔ)數(shù)據(jù),發(fā)現(xiàn)晚的話數(shù)據(jù)可能都補(bǔ)不回來了,這樣對(duì)公司來說就是屬于比較嚴(yán)重的數(shù)據(jù)故障問題,這樣你年終獎(jiǎng)想拿18薪就不太現(xiàn)實(shí)了。
所以針對(duì)這些存在單點(diǎn)故障的進(jìn)程,我們都需要添加監(jiān)控告警機(jī)制,最起碼出問題能及時(shí)知道,再好一點(diǎn)的呢,可以嘗試自動(dòng)修復(fù)重啟。
那針對(duì)Flume中的Agent我們就來實(shí)現(xiàn)一個(gè)監(jiān)控功能,并且嘗試自動(dòng)重啟
大致思路是這樣的,
- 首先需要有一個(gè)配置文件,配置文件中指定你現(xiàn)在需要監(jiān)控哪些Agent
- 有一個(gè)腳本負(fù)責(zé)讀取配置文件中的內(nèi)容,定時(shí)挨個(gè)檢查Agent對(duì)應(yīng)的進(jìn)程還在不在,如果發(fā)現(xiàn)對(duì)應(yīng)的進(jìn)程不在,則記錄錯(cuò)誤信息,然后告警(發(fā)短信或者發(fā)郵件) 并嘗試重啟
創(chuàng)建一個(gè)文件 monlist.conf文件中的第一列指定一個(gè)Agent的唯一標(biāo)識(shí),后期需要根據(jù)這個(gè)標(biāo)識(shí)過濾對(duì)應(yīng)的Flume進(jìn)程,所以一定要保證至少在一臺(tái)機(jī)器上是唯一的,
等號(hào)后面是一個(gè)啟動(dòng)Flume進(jìn)程的腳本,這個(gè)腳本和Agent的唯一標(biāo)識(shí)是一一對(duì)應(yīng)的,后期如果根據(jù)
Agent標(biāo)識(shí)沒有找到對(duì)應(yīng)的進(jìn)程,那么就需要根據(jù)這個(gè)腳本啟動(dòng)進(jìn)程
example=startExample.sh
這個(gè)腳本的內(nèi)容如下: startExample.sh
#!/bin/bash
flume_path=/data/soft/apache-flume-1.9.0-bin
nohup ${flume_path}/bin/flume-ng agent --name a1 --conf ${flume_path}/conf/ -
接著就是要寫一個(gè)腳本來檢查進(jìn)程在不在,不在的話嘗試重啟
創(chuàng)建腳本 monlist.sh
#!/bin/bash
monlist=`cat monlist.conf`
echo "start check"
for item in ${monlist}
do
# 設(shè)置字段分隔符
OLD_IFS=$IFS
IFS="="
# 把一行內(nèi)容轉(zhuǎn)成多列[數(shù)組]
arr=($item)
# 獲取等號(hào)左邊的內(nèi)容
name=${arr[0]}
# 獲取等號(hào)右邊的內(nèi)容
script=${arr[1]}
echo "time is:"`date +"%Y-%m-%d %H:%M:%S"`" check "$name
if [ `jps -m|grep $name | wc -l` -eq 0 ]
then
# 發(fā)短信或者郵件告警
echo `date +"%Y-%m-%d %H:%M:%S"`$name "is none"
sh -x ./${script}
fi
done
注意:這個(gè)需要定時(shí)執(zhí)行,所以可以使用crontab定時(shí)調(diào)度
* * * * * root /bin/bash /data/soft/monlist.sh