和女的做那個(gè)視頻網(wǎng)站應(yīng)用商店優(yōu)化
前言
? ? ? ? 學(xué)數(shù)倉(cāng)的時(shí)候發(fā)現(xiàn) flume 落了一點(diǎn),趕緊補(bǔ)齊。
1、Flume 事務(wù)
Source 在往 Channel 發(fā)送數(shù)據(jù)之前會(huì)開(kāi)啟一個(gè) Put 事務(wù):
- doPut:將批量數(shù)據(jù)寫(xiě)入臨時(shí)緩沖區(qū) putList(當(dāng) source 中的數(shù)據(jù)達(dá)到 batchsize 或者 超過(guò)特定的時(shí)間就會(huì)發(fā)送數(shù)據(jù))
- doCommit:檢查 channel 內(nèi)存隊(duì)列是否足夠合并
- doRollback:如果 channel 內(nèi)存隊(duì)列空間不足沒(méi)救回滾數(shù)據(jù)
同樣 Sink 在從 Channel 主動(dòng)拉取數(shù)據(jù)的時(shí)候也會(huì)開(kāi)啟一個(gè) Take 事務(wù):
- doTake:將數(shù)據(jù)讀取到臨時(shí)緩沖區(qū) takeList,并將數(shù)據(jù)發(fā)送到 HDFS
- doCommit:如果數(shù)據(jù)全部發(fā)送成功,就會(huì)清除臨時(shí)緩沖區(qū) taskList
- dooRollback:數(shù)據(jù)發(fā)送過(guò)程如果出現(xiàn)異常,rollback 將臨時(shí)緩沖區(qū)的數(shù)據(jù)歸還給 channel 內(nèi)存隊(duì)列
2、Flume Agent 內(nèi)部原理
注意:只有 source 和 channel 之間可以存在攔截器,channel 和 sink 之間不可以!??
- source 接收數(shù)據(jù),把數(shù)據(jù)封裝成 Event?
- 傳給 channel processor 也就是 channel 處理器
- 把事件傳給攔截器(interceptor),在攔截器這里可以對(duì)數(shù)據(jù)進(jìn)行一些處理(我們?cè)谏弦还?jié)中說(shuō)過(guò),當(dāng)我們的路徑信息中包含時(shí)間的時(shí)候,需要從 Event Header 中讀取時(shí)間信息,如果沒(méi)有就需要我們指定從本地讀取 timestamp,所以這里我們就可以在攔截器這里給我們的 event 添加頭部信息);而且,攔截器可以設(shè)置多個(gè)
- 經(jīng)過(guò)攔截器處理的事件又返回給了 channel processor ,然后 channel processor 把事件傳給 channel 選擇器(channel selector 有兩種類型:Replicating 和 Multiplexing ,Replicating 會(huì)把source 發(fā)送來(lái)的 events 發(fā)往所有 channel,而 multiplexing 可以配置指定發(fā)往哪些 channel)
- 經(jīng)過(guò) channel 選擇器處理后的事件仍然返回給 channel processor
- channel processor 會(huì)根據(jù) channel 選擇器的結(jié)果,發(fā)送給相應(yīng)的 channel(也就是這個(gè)時(shí)候才會(huì)真正的開(kāi)啟 put 事務(wù),之前都是對(duì) event 進(jìn)行簡(jiǎn)單的處理)
- SinkProcessor 負(fù)責(zé)協(xié)調(diào)拉取 channel 中的數(shù)據(jù),它有三種類型:DefaultSinkProcessor、LoadBalancingSinkpProcessor(負(fù)載均衡,也就是多個(gè) Sink 輪詢的方式去讀取 channel 中的數(shù)據(jù))、FailoverSinkProcessor(故障轉(zhuǎn)移,每個(gè) sink 有自己的優(yōu)先級(jí),優(yōu)先級(jí)高的去讀取 channel 中的事件,只有當(dāng)它掛掉的時(shí)候,才會(huì)輪到下一個(gè)優(yōu)先級(jí)的 sink 去讀)。其中 DefaultSinkProcessor 一個(gè) channel 只能綁定一個(gè) Sink,所以它也就沒(méi)有 sink 組的概念。
注意:一個(gè) sink 只可以綁定一個(gè) channel ,但是一個(gè) channel 可以綁定多個(gè) sink!
3、Flume 拓?fù)浣Y(jié)構(gòu)
3.1、簡(jiǎn)單串聯(lián)
官網(wǎng)這段話翻譯過(guò)來(lái)就是:為了將數(shù)據(jù)跨越多個(gè)代理或躍點(diǎn)進(jìn)行傳輸,前一個(gè)代理的接收器(sink)和當(dāng)前躍點(diǎn)的源(source)需要是avro類型,接收器指向源的主機(jī)名(或IP地址)和端口。
這種模式的缺點(diǎn)很好理解,就像串聯(lián)電路,一個(gè)節(jié)點(diǎn)壞了會(huì)影響整個(gè)系統(tǒng)。
3.2、復(fù)制和多路復(fù)用
從官網(wǎng)翻譯過(guò)來(lái)就是:上述示例顯示了一個(gè)名為“foo”的代理源將流程分散到三個(gè)不同的通道。這種分散可以是復(fù)制或多路復(fù)用。在復(fù)制流程的情況下,每個(gè)事件都會(huì)發(fā)送到這三個(gè)通道。對(duì)于多路復(fù)用的情況,當(dāng)事件的屬性與預(yù)配置的值匹配時(shí),事件將被發(fā)送到可用通道的子集。例如,如果事件屬性名為“txnType”設(shè)置為“customer”,則應(yīng)發(fā)送到channel1和channel3,如果為“vendor”,則應(yīng)發(fā)送到channel2,否則發(fā)送到channel3。映射可以在代理的配置文件中設(shè)置。
這種模式相比上面的串聯(lián)模式的優(yōu)點(diǎn)無(wú)非就是可以發(fā)送過(guò)多個(gè)目的地。
3.3、負(fù)載均衡和故障轉(zhuǎn)移
Flume 支持多個(gè) Sink 邏輯上分到一個(gè) Sink 組,sink 組配合不同的 SinkProcessor ,可以實(shí)現(xiàn)負(fù)載均衡和錯(cuò)誤恢復(fù)的功能。
3.4、聚合
這種模式在實(shí)際開(kāi)發(fā)中是經(jīng)常會(huì)用到的,日常web應(yīng)用通常分布在上百個(gè)服務(wù)器,大者甚至上千個(gè)、上萬(wàn)個(gè)服務(wù)器。產(chǎn)生的日志,處理起來(lái)也非常麻煩。用flume的這種組合方式能很好的解決這一問(wèn)題,每臺(tái)服務(wù)器部署一個(gè)flume采集日志,傳送到一個(gè)集中收集日志的 flume,再由此flume上傳到hdfs、hive、hbase等,進(jìn)行日志分析。
4、Flume 企業(yè)開(kāi)發(fā)實(shí)例
4.1、復(fù)制和多路復(fù)用
注意:多路復(fù)用必須配合攔截器使用,因?yàn)樾枰?Event Header 中添加一些信息。
1)案例需求
2)需求分析
- 監(jiān)控文件變動(dòng)我們可以考慮使用 taildir 或者 exec 這兩種 source
- flume-1 sink 需要使用 avro sink 才能傳輸?shù)较乱粋€(gè) flume-2 和 flume-3 的 source
- flume-2 需要上傳數(shù)據(jù)到 HDFS 所以?sink 為 hdfs
- flume-3 需要把數(shù)據(jù)輸出到本地,所以 sink 為 file_roll sink(要保存到本地目錄,這個(gè)目錄就必須提前創(chuàng)建好,它不像 HDFS Sink 會(huì)自動(dòng)幫我們創(chuàng)建)
我們需要實(shí)現(xiàn)三個(gè) flume 作業(yè):
- flume-1 把監(jiān)聽(tīng)到的新日志讀取到 flume-2 和 flume-3 的 source
- flume-2 把日志上傳到 hdfs
- flume-3 把日志寫(xiě)到本地
3)需求實(shí)現(xiàn)
flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# 將數(shù)據(jù)流復(fù)制給所有 channel 默認(rèn)就是 replicating 所以也可以不用配置
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink 端的 avro 是一個(gè)數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
# 一個(gè) sink 只可以指定一個(gè) channel,但是一個(gè) channel 可以指定多個(gè) sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source 端的 avro 是一個(gè)數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè) Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與 Event 數(shù)量無(wú)關(guān)
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
4)測(cè)試
bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-dir.conf
bin/flume-ng agent -n a1 -c conf/ -f job/group1/flume-file-flumc.conf
bin/flume-ng agent -n a2 -c conf/ -f job/group1/flume-hdfs.conf
查看結(jié)果:
注意:寫(xiě)入本地文件時(shí),當(dāng)一段時(shí)間沒(méi)有新的日志時(shí),它仍然會(huì)創(chuàng)建一個(gè)新的文件,而不像 hdfs sink 即使達(dá)到了設(shè)置的間隔時(shí)間但是沒(méi)有新日志產(chǎn)生,那么它也不會(huì)創(chuàng)建一個(gè)新的文件。
這個(gè)需要注意的就是 hdfs 的端口不要寫(xiě)錯(cuò),比如我的就不是 9870 而是 8020.
4.2、負(fù)載均衡和故障轉(zhuǎn)移
1)案例需求
2)需求分析
- 開(kāi)啟一個(gè)端口 88888 來(lái)發(fā)送數(shù)據(jù)
- 使用 flume-1 監(jiān)聽(tīng)該端口,并發(fā)送到 flume-2 和 flume-3 (需要 flume-1 的 sink 為 avro sink,flume-2 和 flume-3 的 source 為 avro source),flume-2 和 flume-3 發(fā)送日志到控制臺(tái)(flume-2 和 flume-3 的 sink 為 logger sink)
3)需求實(shí)現(xiàn)
flume-nc-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = logger# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-console2.conf?
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
?4)案例測(cè)試
bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-nc-flume.conf
關(guān)閉 flume-flume-console1.conf 作業(yè)?
?我們發(fā)現(xiàn),一開(kāi)始我們開(kāi)啟三個(gè) flume 作業(yè),當(dāng)向 netcat 輸入數(shù)據(jù)時(shí),只有 flume-flume-console1.conf 作業(yè)的控制臺(tái)有日志輸出,這是因?yàn)樗膬?yōu)先級(jí)更高,當(dāng)把作業(yè)?flume-flume-console1.conf 關(guān)閉時(shí),再次向端口 44444 發(fā)送數(shù)據(jù),發(fā)現(xiàn)?flume-flume-console2.conf 作業(yè)開(kāi)始輸出。
如果要使用負(fù)載均衡,只需要替換上面 flume-nc-flume.conf 中:
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
替換為:
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.maxTimeOut = 30000
其中,backoff 代表退避,默認(rèn)為 false,?如果當(dāng)前 sink 沒(méi)有拉到數(shù)據(jù),那么接下來(lái)一段時(shí)間就不用這個(gè) sink 。maxTimeOut 代表最大的退避時(shí)間,因?yàn)橥吮苣J(rèn)是指數(shù)增長(zhǎng)的(比如一個(gè) sink 第一次沒(méi)有拉到數(shù)據(jù),需要等 1 s,第二次還沒(méi)拉到,等 2s,第三次等 4s ...),默認(rèn)最大值為 30 s。
4.3、聚合
1)案例需求
- hadoop102 上的 Flume-1 監(jiān)控文件/opt/module/group.log,
- hadoop103 上的 Flume-2 監(jiān)控某一個(gè)端口的數(shù)據(jù)流,
- Flume-1 與 Flume-2 將數(shù)據(jù)發(fā)給?hadoop104 上的 Flume-3,Flume-3 將最終數(shù)據(jù)打印到控制臺(tái)。
注意:主機(jī)只能在 hadoop104 上配,因?yàn)?avro source 在 hadoop104 上,客戶端(hadoop02 和 hadoop103 的 sink)可以遠(yuǎn)程連接,但是服務(wù)端(hadoop104 的 source)只能綁定自己的端口號(hào)。
2)需求實(shí)現(xiàn)
flume-log-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
?flume-nc-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-log.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
3)測(cè)試
向 group.log 文件中追加文本:
注意:hadoop103 這里不能寫(xiě) nc localhost 44444 而要寫(xiě) nc hadoop103 44444! 否則報(bào)錯(cuò):Ncat: Connection refused.
5、自定義 Interceptor
前面我們的多路復(fù)用還沒(méi)有實(shí)現(xiàn),因?yàn)槲覀冋f(shuō)多路復(fù)用必須配合攔截器來(lái)使用,因?yàn)槲覀儽仨氈烂總€(gè) Channel 發(fā)往哪些 Sink,這需要攔截器往 Event Header 中寫(xiě)一些內(nèi)容。
1)案例需求
2)需求分析
在實(shí)際的開(kāi)發(fā)中,一臺(tái)服務(wù)器產(chǎn)生的日志類型可能有很多種,不同類型的日志可能需要發(fā)送到不同的分析系統(tǒng)。此時(shí)會(huì)用到 Flume 拓?fù)浣Y(jié)構(gòu)中的 Multiplexing 結(jié)構(gòu),Multiplexing 的原理是,根據(jù) event 中 Header 的某個(gè) key 的值,將不同的 event 發(fā)送到不同的 Channel中,所以我們需要自定義一個(gè) Interceptor,為不同類型的 event 的 Header 中的 key 賦予不同的值。
在該案例中,我們以端口數(shù)據(jù)模擬日志,以是否包含”lyh”模擬不同類型的日志,我們需要自定義 interceptor 區(qū)分?jǐn)?shù)據(jù)中是否包含”lyh”,將其分別發(fā)往不同的分析系統(tǒng)(Channel)。
?3)需求實(shí)現(xiàn)
自定義攔截器
引入 flume 依賴
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
package com.lyh.gmall.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TypeInterceptor implements Interceptor {// 存放事件集合private List<Event> addHeaderEvents;@Overridepublic void initialize() {// 初始化存放事件的集合addHeaderEvents = new ArrayList<>();}// 單個(gè)事件攔截@Overridepublic Event intercept(Event event) {// 1. 獲取事件中的 header 信息Map<String, String> headers = event.getHeaders();// 2. 獲取事件中的 body 信息String body = new String(event.getBody());// 3. 根據(jù) body 中是否包含 'lyh' 來(lái)決定發(fā)往哪個(gè) sinkif (body.contains("lyh"))headers.put("type","first");elseheaders.put("type","second");return event;}// 批量事件攔截@Overridepublic List<Event> intercept(List<Event> events) {// 1. 清空集合addHeaderEvents.clear();// 2. 遍歷 eventsfor (Event event : events) {// 3. 給每個(gè)事件添加頭信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TypeInterceptor();}@Overridepublic void configure(Context context) {}}
}
打包放到 flume 安裝目錄的 lib 目錄下:
?
flume 作業(yè)配置
hadoop102:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lyh.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1 # 包含 'lyh'
a1.sources.r1.selector.mapping.second = c2 # 不包含 'lyh'# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
4)需求實(shí)現(xiàn)
#hadoop103
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume2.conf -Dflume.root.logger=INFO,console#hadoop104
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume3.conf -Dflume.root.logger=INFO,console#hadoop102
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume1.conf
nc localhost 44444
hadoop102:
hadoop103:
hadoop104:?
可以看到,從 hadoop102 發(fā)送的日志中,包含 "lyh" 的都被發(fā)往 hadoop103 的 4141 端口,其它日志則被發(fā)往 hadoop104 的 4242端口。
6、自定義 Source
自定義 source 用的還是比較少的,畢竟 flume 已經(jīng)提供了很多常用的了。
1)介紹
- getBackOffSleepIncrement() //backoff 步長(zhǎng),當(dāng)從數(shù)據(jù)源拉取數(shù)據(jù)時(shí),拉取不到數(shù)據(jù)的話它不會(huì)一直再去拉取,而是等待,之后每一次再=如果還拉取不到,就會(huì)比上一次多等待步長(zhǎng)單位個(gè)時(shí)間。
- getMaxBackOffSleepInterval()? //backoff 最長(zhǎng)時(shí)間,如果不設(shè)置最長(zhǎng)等待時(shí)間,它最終會(huì)無(wú)限等待,所以需要指定。
- configure(Context context)? //初始化 context(讀取配置文件內(nèi)容)
- process()? //獲取數(shù)據(jù)封裝成 event 并寫(xiě)入 channel,這個(gè)方法將被循環(huán)調(diào)用。
2)需求
3)分析
4)需求實(shí)現(xiàn)
代碼
package com.lyh.source;import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.HashMap;
import java.util.Map;public class MySource extends AbstractSource implements Configurable, PollableSource {// 定義配置文件將來(lái)要讀取的字段private Long delay;private String field;@Overridepublic Status process() throws EventDeliveryException {try {// 創(chuàng)建事件頭信息Map<String,String> headerMap = new HashMap<>();// 創(chuàng)建事件SimpleEvent event = new SimpleEvent();// 循環(huán)封裝事件for (int i = 0; i < 5; i++) {// 給事件設(shè)置頭信息event.setHeaders(headerMap);// 給事件設(shè)置內(nèi)容event.setBody((field + i).getBytes());// 將事件寫(xiě)入 channelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}} catch (InterruptedException e) {e.printStackTrace();}return Status.READY;}// 步長(zhǎng)@Overridepublic long getBackOffSleepIncrement() {return 0;}// 最大間隔時(shí)間@Overridepublic long getMaxBackOffSleepInterval() {return 0;}// 初始化配置信息@Overridepublic void configure(Context context) {delay = context.getLong("delay");field = context.getString("field","Hello");}
}
配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = com.lyh.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = lyh# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/custom-source.conf -Dflume.root.logger=INFO,console
運(yùn)行結(jié)果:?
7、自定義 Sink
1)介紹
- configure(Context context)//初始化 context(讀取配置文件內(nèi)容)
- process()//從 Channel 讀取獲取數(shù)據(jù)(event),這個(gè)方法將被循環(huán)調(diào)用。
2)需求分析

?3)需求實(shí)現(xiàn)
package com.lyh.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable{private final static Logger LOG = LoggerFactory.getLogger(AbstractSink.class);private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {// 聲明返回值狀態(tài)信息Status status;// 獲取當(dāng)前 sink 綁定的 channelChannel channel = getChannel();// 獲取事務(wù)Transaction txn = channel.getTransaction();// 聲明事件Event event;// 開(kāi)啟事務(wù)txn.begin();// 讀取 channel 中的事件、直到讀取事件結(jié)束循環(huán)while (true){event = channel.take();if (event!=null) break;}try {// 打印事件LOG.info(prefix + new String(event.getBody()) + suffix);// 事務(wù)提交txn.commit();status = Status.READY;}catch (Exception e){// 遇到異?;貪L事務(wù)txn.rollback();status = Status.BACKOFF;}finally {// 關(guān)閉事務(wù)txn.close();}return null;}// 初始化配置信息@Overridepublic void configure(Context context) {// 帶默認(rèn)值prefix = context.getString("prefix","hello");// 不帶默認(rèn)值suffix = context.getString("suffix");}
}
配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
a1.sinks.k1.prefix = lyh:
a1.sinks.k1.suffix = :lyh# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4)測(cè)試
bin/flume-ng agent -n a1 -c conf/ -f job/custom-sink.conf -Dflume.root.logger=INFO,console
運(yùn)行結(jié)果:

總結(jié)
? ? ? ? 自此,flume 的學(xué)習(xí)基本也完了,這一篇雖然不多但也用了大概3天時(shí)間。相比較 kafka、flink,flume 這個(gè)框架還是非常簡(jiǎn)單的,比如我們自己實(shí)現(xiàn)一些 source、sink,都是很簡(jiǎn)單的,沒(méi)有太多復(fù)雜的理解的東西。
? ? ? ? 總之 flume 這個(gè)工具還是多看官網(wǎng)。