中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

和女的做那個(gè)視頻網(wǎng)站應(yīng)用商店優(yōu)化

和女的做那個(gè)視頻網(wǎng)站,應(yīng)用商店優(yōu)化,做網(wǎng)站的收費(fèi),網(wǎng)站設(shè)計(jì)實(shí)用實(shí)例前言 學(xué)數(shù)倉(cāng)的時(shí)候發(fā)現(xiàn) flume 落了一點(diǎn),趕緊補(bǔ)齊。 1、Flume 事務(wù) Source 在往 Channel 發(fā)送數(shù)據(jù)之前會(huì)開(kāi)啟一個(gè) Put 事務(wù): doPut:將批量數(shù)據(jù)寫(xiě)入臨時(shí)緩沖區(qū) putList(當(dāng) source 中的數(shù)據(jù)達(dá)到 batchsize 或者 超過(guò)特定的時(shí)間就會(huì)…

前言

? ? ? ? 學(xué)數(shù)倉(cāng)的時(shí)候發(fā)現(xiàn) flume 落了一點(diǎn),趕緊補(bǔ)齊。

1、Flume 事務(wù)

Source 在往 Channel 發(fā)送數(shù)據(jù)之前會(huì)開(kāi)啟一個(gè) Put 事務(wù):

  1. doPut:將批量數(shù)據(jù)寫(xiě)入臨時(shí)緩沖區(qū) putList(當(dāng) source 中的數(shù)據(jù)達(dá)到 batchsize 或者 超過(guò)特定的時(shí)間就會(huì)發(fā)送數(shù)據(jù))
  2. doCommit:檢查 channel 內(nèi)存隊(duì)列是否足夠合并
  3. doRollback:如果 channel 內(nèi)存隊(duì)列空間不足沒(méi)救回滾數(shù)據(jù)

同樣 Sink 在從 Channel 主動(dòng)拉取數(shù)據(jù)的時(shí)候也會(huì)開(kāi)啟一個(gè) Take 事務(wù):

  1. doTake:將數(shù)據(jù)讀取到臨時(shí)緩沖區(qū) takeList,并將數(shù)據(jù)發(fā)送到 HDFS
  2. doCommit:如果數(shù)據(jù)全部發(fā)送成功,就會(huì)清除臨時(shí)緩沖區(qū) taskList
  3. dooRollback:數(shù)據(jù)發(fā)送過(guò)程如果出現(xiàn)異常,rollback 將臨時(shí)緩沖區(qū)的數(shù)據(jù)歸還給 channel 內(nèi)存隊(duì)列

2、Flume Agent 內(nèi)部原理

注意:只有 source 和 channel 之間可以存在攔截器,channel 和 sink 之間不可以!??

  1. source 接收數(shù)據(jù),把數(shù)據(jù)封裝成 Event?
  2. 傳給 channel processor 也就是 channel 處理器
  3. 把事件傳給攔截器(interceptor),在攔截器這里可以對(duì)數(shù)據(jù)進(jìn)行一些處理(我們?cè)谏弦还?jié)中說(shuō)過(guò),當(dāng)我們的路徑信息中包含時(shí)間的時(shí)候,需要從 Event Header 中讀取時(shí)間信息,如果沒(méi)有就需要我們指定從本地讀取 timestamp,所以這里我們就可以在攔截器這里給我們的 event 添加頭部信息);而且,攔截器可以設(shè)置多個(gè)
  4. 經(jīng)過(guò)攔截器處理的事件又返回給了 channel processor ,然后 channel processor 把事件傳給 channel 選擇器(channel selector 有兩種類型:Replicating 和 Multiplexing ,Replicating 會(huì)把source 發(fā)送來(lái)的 events 發(fā)往所有 channel,而 multiplexing 可以配置指定發(fā)往哪些 channel)
  5. 經(jīng)過(guò) channel 選擇器處理后的事件仍然返回給 channel processor
  6. channel processor 會(huì)根據(jù) channel 選擇器的結(jié)果,發(fā)送給相應(yīng)的 channel(也就是這個(gè)時(shí)候才會(huì)真正的開(kāi)啟 put 事務(wù),之前都是對(duì) event 進(jìn)行簡(jiǎn)單的處理)
  7. SinkProcessor 負(fù)責(zé)協(xié)調(diào)拉取 channel 中的數(shù)據(jù),它有三種類型:DefaultSinkProcessor、LoadBalancingSinkpProcessor(負(fù)載均衡,也就是多個(gè) Sink 輪詢的方式去讀取 channel 中的數(shù)據(jù))、FailoverSinkProcessor(故障轉(zhuǎn)移,每個(gè) sink 有自己的優(yōu)先級(jí),優(yōu)先級(jí)高的去讀取 channel 中的事件,只有當(dāng)它掛掉的時(shí)候,才會(huì)輪到下一個(gè)優(yōu)先級(jí)的 sink 去讀)。其中 DefaultSinkProcessor 一個(gè) channel 只能綁定一個(gè) Sink,所以它也就沒(méi)有 sink 組的概念。

注意:一個(gè) sink 只可以綁定一個(gè) channel ,但是一個(gè) channel 可以綁定多個(gè) sink!

3、Flume 拓?fù)浣Y(jié)構(gòu)

3.1、簡(jiǎn)單串聯(lián)

官網(wǎng)這段話翻譯過(guò)來(lái)就是:為了將數(shù)據(jù)跨越多個(gè)代理或躍點(diǎn)進(jìn)行傳輸,前一個(gè)代理的接收器(sink)和當(dāng)前躍點(diǎn)的源(source)需要是avro類型,接收器指向源的主機(jī)名(或IP地址)和端口。

這種模式的缺點(diǎn)很好理解,就像串聯(lián)電路,一個(gè)節(jié)點(diǎn)壞了會(huì)影響整個(gè)系統(tǒng)。

3.2、復(fù)制和多路復(fù)用

從官網(wǎng)翻譯過(guò)來(lái)就是:上述示例顯示了一個(gè)名為“foo”的代理源將流程分散到三個(gè)不同的通道。這種分散可以是復(fù)制或多路復(fù)用。在復(fù)制流程的情況下,每個(gè)事件都會(huì)發(fā)送到這三個(gè)通道。對(duì)于多路復(fù)用的情況,當(dāng)事件的屬性與預(yù)配置的值匹配時(shí),事件將被發(fā)送到可用通道的子集。例如,如果事件屬性名為“txnType”設(shè)置為“customer”,則應(yīng)發(fā)送到channel1和channel3,如果為“vendor”,則應(yīng)發(fā)送到channel2,否則發(fā)送到channel3。映射可以在代理的配置文件中設(shè)置。

這種模式相比上面的串聯(lián)模式的優(yōu)點(diǎn)無(wú)非就是可以發(fā)送過(guò)多個(gè)目的地。

3.3、負(fù)載均衡和故障轉(zhuǎn)移

Flume 支持多個(gè) Sink 邏輯上分到一個(gè) Sink 組,sink 組配合不同的 SinkProcessor ,可以實(shí)現(xiàn)負(fù)載均衡和錯(cuò)誤恢復(fù)的功能。

3.4、聚合

這種模式在實(shí)際開(kāi)發(fā)中是經(jīng)常會(huì)用到的,日常web應(yīng)用通常分布在上百個(gè)服務(wù)器,大者甚至上千個(gè)、上萬(wàn)個(gè)服務(wù)器。產(chǎn)生的日志,處理起來(lái)也非常麻煩。用flume的這種組合方式能很好的解決這一問(wèn)題,每臺(tái)服務(wù)器部署一個(gè)flume采集日志,傳送到一個(gè)集中收集日志的 flume,再由此flume上傳到hdfs、hivehbase等,進(jìn)行日志分析。

4、Flume 企業(yè)開(kāi)發(fā)實(shí)例

4.1、復(fù)制和多路復(fù)用

注意:多路復(fù)用必須配合攔截器使用,因?yàn)樾枰?Event Header 中添加一些信息。

1)案例需求

使用 Flume-1 監(jiān)控文件變動(dòng),Flume-1 將變動(dòng)內(nèi)容傳遞給 Flume-2,Flume-2 負(fù)責(zé)存儲(chǔ)到 HDFS。同時(shí) Flume-1 將變動(dòng)內(nèi)容傳遞給 Flume-3,Flume-3 負(fù)責(zé)輸出到 Local FileSystem。

2)需求分析

  • 監(jiān)控文件變動(dòng)我們可以考慮使用 taildir 或者 exec 這兩種 source
  • flume-1 sink 需要使用 avro sink 才能傳輸?shù)较乱粋€(gè) flume-2 和 flume-3 的 source
  • flume-2 需要上傳數(shù)據(jù)到 HDFS 所以?sink 為 hdfs
  • flume-3 需要把數(shù)據(jù)輸出到本地,所以 sink 為 file_roll sink(要保存到本地目錄,這個(gè)目錄就必須提前創(chuàng)建好,它不像 HDFS Sink 會(huì)自動(dòng)幫我們創(chuàng)建)

我們需要實(shí)現(xiàn)三個(gè) flume 作業(yè):

  1. flume-1 把監(jiān)聽(tīng)到的新日志讀取到 flume-2 和 flume-3 的 source
  2. flume-2 把日志上傳到 hdfs
  3. flume-3 把日志寫(xiě)到本地

3)需求實(shí)現(xiàn)

flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# 將數(shù)據(jù)流復(fù)制給所有 channel 默認(rèn)就是 replicating 所以也可以不用配置
a1.sources.r1.selector.type = replicating 
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink 端的 avro 是一個(gè)數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
# 一個(gè) sink 只可以指定一個(gè) channel,但是一個(gè) channel 可以指定多個(gè) sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source 端的 avro 是一個(gè)數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè) Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與 Event 數(shù)量無(wú)關(guān)
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

4)測(cè)試

bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-dir.conf
bin/flume-ng agent -n a1 -c conf/ -f job/group1/flume-file-flumc.conf
bin/flume-ng agent -n a2 -c conf/ -f job/group1/flume-hdfs.conf

查看結(jié)果:

注意:寫(xiě)入本地文件時(shí),當(dāng)一段時(shí)間沒(méi)有新的日志時(shí),它仍然會(huì)創(chuàng)建一個(gè)新的文件,而不像 hdfs sink 即使達(dá)到了設(shè)置的間隔時(shí)間但是沒(méi)有新日志產(chǎn)生,那么它也不會(huì)創(chuàng)建一個(gè)新的文件。

這個(gè)需要注意的就是 hdfs 的端口不要寫(xiě)錯(cuò),比如我的就不是 9870 而是 8020.

4.2、負(fù)載均衡和故障轉(zhuǎn)移

1)案例需求

使用 Flume1 監(jiān)控一個(gè)端口,其 sink 組中的 sink 分別對(duì)接 Flume2 和 Flume3,采用 FailoverSinkProcessor,實(shí)現(xiàn)故障轉(zhuǎn)移的功能。

2)需求分析

  • 開(kāi)啟一個(gè)端口 88888 來(lái)發(fā)送數(shù)據(jù)
  • 使用 flume-1 監(jiān)聽(tīng)該端口,并發(fā)送到 flume-2 和 flume-3 (需要 flume-1 的 sink 為 avro sink,flume-2 和 flume-3 的 source 為 avro source),flume-2 和 flume-3 發(fā)送日志到控制臺(tái)(flume-2 和 flume-3 的 sink 為 logger sink)

3)需求實(shí)現(xiàn)

flume-nc-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = logger# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-console2.conf?
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

?4)案例測(cè)試

bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-nc-flume.conf

關(guān)閉 flume-flume-console1.conf 作業(yè)?

?我們發(fā)現(xiàn),一開(kāi)始我們開(kāi)啟三個(gè) flume 作業(yè),當(dāng)向 netcat 輸入數(shù)據(jù)時(shí),只有 flume-flume-console1.conf 作業(yè)的控制臺(tái)有日志輸出,這是因?yàn)樗膬?yōu)先級(jí)更高,當(dāng)把作業(yè)?flume-flume-console1.conf 關(guān)閉時(shí),再次向端口 44444 發(fā)送數(shù)據(jù),發(fā)現(xiàn)?flume-flume-console2.conf 作業(yè)開(kāi)始輸出。

如果要使用負(fù)載均衡,只需要替換上面 flume-nc-flume.conf 中:

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

替換為:

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.maxTimeOut = 30000

其中,backoff 代表退避,默認(rèn)為 false,?如果當(dāng)前 sink 沒(méi)有拉到數(shù)據(jù),那么接下來(lái)一段時(shí)間就不用這個(gè) sink 。maxTimeOut 代表最大的退避時(shí)間,因?yàn)橥吮苣J(rèn)是指數(shù)增長(zhǎng)的(比如一個(gè) sink 第一次沒(méi)有拉到數(shù)據(jù),需要等 1 s,第二次還沒(méi)拉到,等 2s,第三次等 4s ...),默認(rèn)最大值為 30 s。

4.3、聚合

1)案例需求

  • hadoop102 上的 Flume-1 監(jiān)控文件/opt/module/group.log,
  • hadoop103 上的 Flume-2 監(jiān)控某一個(gè)端口的數(shù)據(jù)流,
  • Flume-1 與 Flume-2 將數(shù)據(jù)發(fā)?hadoop104 上的 Flume-3,Flume-3 將最終數(shù)據(jù)打印到控制臺(tái)。

注意:主機(jī)只能在 hadoop104 上配,因?yàn)?avro source 在 hadoop104 上,客戶端(hadoop02 和 hadoop103 的 sink)可以遠(yuǎn)程連接,但是服務(wù)端(hadoop104 的 source)只能綁定自己的端口號(hào)。

2)需求實(shí)現(xiàn)

flume-log-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
?flume-nc-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-flume-log.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3)測(cè)試

向 group.log 文件中追加文本:

注意:hadoop103 這里不能寫(xiě) nc localhost 44444 而要寫(xiě) nc hadoop103 44444! 否則報(bào)錯(cuò):Ncat: Connection refused.

5、自定義 Interceptor

前面我們的多路復(fù)用還沒(méi)有實(shí)現(xiàn),因?yàn)槲覀冋f(shuō)多路復(fù)用必須配合攔截器來(lái)使用,因?yàn)槲覀儽仨氈烂總€(gè) Channel 發(fā)往哪些 Sink,這需要攔截器往 Event Header 中寫(xiě)一些內(nèi)容。

1)案例需求

使用 Flume 采集服務(wù)器本地日志,需要按照日志類型的不同,將不同種類的日志發(fā)往不同的分析系統(tǒng)。

2)需求分析

在實(shí)際的開(kāi)發(fā)中,一臺(tái)服務(wù)器產(chǎn)生的日志類型可能有很多種,不同類型的日志可能需要發(fā)送到不同的分析系統(tǒng)。此時(shí)會(huì)用到 Flume 拓?fù)浣Y(jié)構(gòu)中的 Multiplexing 結(jié)構(gòu),Multiplexing 的原理是,根據(jù) event 中 Header 的某個(gè) key 的值,將不同的 event 發(fā)送到不同的 Channel中,所以我們需要自定義一個(gè) Interceptor,為不同類型的 event 的 Header 中的 key 賦予不同的值。

在該案例中,我們以端口數(shù)據(jù)模擬日志,以是否包含”lyh”模擬不同類型的日志,我們需要自定義 interceptor 區(qū)分?jǐn)?shù)據(jù)中是否包含”lyh”,將其分別發(fā)往不同的分析系統(tǒng)(Channel)。

?3)需求實(shí)現(xiàn)

自定義攔截器

引入 flume 依賴

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
package com.lyh.gmall.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TypeInterceptor implements Interceptor {// 存放事件集合private List<Event> addHeaderEvents;@Overridepublic void initialize() {// 初始化存放事件的集合addHeaderEvents = new ArrayList<>();}// 單個(gè)事件攔截@Overridepublic Event intercept(Event event) {// 1. 獲取事件中的 header 信息Map<String, String> headers = event.getHeaders();// 2. 獲取事件中的 body 信息String body = new String(event.getBody());// 3. 根據(jù) body 中是否包含 'lyh' 來(lái)決定發(fā)往哪個(gè) sinkif (body.contains("lyh"))headers.put("type","first");elseheaders.put("type","second");return event;}// 批量事件攔截@Overridepublic List<Event> intercept(List<Event> events) {// 1. 清空集合addHeaderEvents.clear();// 2. 遍歷 eventsfor (Event event : events) {// 3. 給每個(gè)事件添加頭信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TypeInterceptor();}@Overridepublic void configure(Context context) {}}
}

打包放到 flume 安裝目錄的 lib 目錄下:
?

flume 作業(yè)配置

hadoop102:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lyh.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1 # 包含 'lyh'
a1.sources.r1.selector.mapping.second = c2 # 不包含 'lyh'# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
hadoop103:
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
hadoop104:
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

4)需求實(shí)現(xiàn)

#hadoop103
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume2.conf -Dflume.root.logger=INFO,console#hadoop104
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume3.conf -Dflume.root.logger=INFO,console#hadoop102
bin/flume-ng agent -n a1 -c conf/ -f job/group4/flume1.conf
nc localhost 44444

hadoop102:

hadoop103:

hadoop104:?

可以看到,從 hadoop102 發(fā)送的日志中,包含 "lyh" 的都被發(fā)往 hadoop103 的 4141 端口,其它日志則被發(fā)往 hadoop104 的 4242端口。

6、自定義 Source

自定義 source 用的還是比較少的,畢竟 flume 已經(jīng)提供了很多常用的了。

1)介紹

????????Source 是負(fù)責(zé)接收數(shù)據(jù)到 Flume Agent 的組件。Source 組件可以處理各種類型、各種格式的日志數(shù)據(jù),包括 avro、thrift、exec、jms、spooling directory、netcat、sequence、generator、syslog、http、legacy。官方提供的 source 類型已經(jīng)很多,但是有時(shí)候并不能滿足實(shí)際開(kāi)發(fā)當(dāng)中的需求,此時(shí)我們就需要根據(jù)實(shí)際需求自定義某些 source。
官方也提供了自定義 source 的接口: https://flume.apache.org/FlumeDeveloperGuide.html#source 根據(jù)官方說(shuō)明自定義 MySource 需要繼承 AbstractSource 類并實(shí)現(xiàn) Configurable 和 PollableSource 接口。
實(shí)現(xiàn)相應(yīng)方法:
  • getBackOffSleepIncrement() //backoff 步長(zhǎng),當(dāng)從數(shù)據(jù)源拉取數(shù)據(jù)時(shí),拉取不到數(shù)據(jù)的話它不會(huì)一直再去拉取,而是等待,之后每一次再=如果還拉取不到,就會(huì)比上一次多等待步長(zhǎng)單位個(gè)時(shí)間。
  • getMaxBackOffSleepInterval()? //backoff 最長(zhǎng)時(shí)間,如果不設(shè)置最長(zhǎng)等待時(shí)間,它最終會(huì)無(wú)限等待,所以需要指定。
  • configure(Context context)? //初始化 context(讀取配置文件內(nèi)容)
  • process()? //獲取數(shù)據(jù)封裝成 event 并寫(xiě)入 channel,這個(gè)方法將被循環(huán)調(diào)用。
使用場(chǎng)景:讀取 MySQL 數(shù)據(jù)或者其他文件系統(tǒng)。

2)需求

使用 flume 接收數(shù)據(jù),并給每條數(shù)據(jù)添加前綴,輸出到控制臺(tái)。前綴可從 flume 配置文
件中配置。

3)分析

4)需求實(shí)現(xiàn)

代碼

package com.lyh.source;import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.HashMap;
import java.util.Map;public class MySource extends AbstractSource implements Configurable, PollableSource {// 定義配置文件將來(lái)要讀取的字段private Long delay;private String field;@Overridepublic Status process() throws EventDeliveryException {try {// 創(chuàng)建事件頭信息Map<String,String> headerMap = new HashMap<>();// 創(chuàng)建事件SimpleEvent event = new SimpleEvent();// 循環(huán)封裝事件for (int i = 0; i < 5; i++) {// 給事件設(shè)置頭信息event.setHeaders(headerMap);// 給事件設(shè)置內(nèi)容event.setBody((field + i).getBytes());// 將事件寫(xiě)入 channelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}} catch (InterruptedException e) {e.printStackTrace();}return Status.READY;}// 步長(zhǎng)@Overridepublic long getBackOffSleepIncrement() {return 0;}// 最大間隔時(shí)間@Overridepublic long getMaxBackOffSleepInterval() {return 0;}// 初始化配置信息@Overridepublic void configure(Context context) {delay = context.getLong("delay");field = context.getString("field","Hello");}
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = com.lyh.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = lyh# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/custom-source.conf -Dflume.root.logger=INFO,console

運(yùn)行結(jié)果:?

7、自定義 Sink

1)介紹

????????Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,并將這些事件批量寫(xiě)入到存儲(chǔ)或索引系統(tǒng)、或者被發(fā)送到另一個(gè) Flume Agent。
????????Sink 是完全事務(wù)性的。在從 Channel 批量刪除數(shù)據(jù)之前,每個(gè) Sink 用 Channel 啟動(dòng)一個(gè)事務(wù)。批量事件一旦成功寫(xiě)出到存儲(chǔ)系統(tǒng)或下一個(gè) Flume Agent,Sink 就利用 Channel 提交事務(wù)。事務(wù)一旦被提交,該 Channel 從自己的內(nèi)部緩沖區(qū)刪除事件。
????????Sink 組件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、 自定義。官方提供的 Sink 類型已經(jīng)很多,但是有時(shí)候并不能滿足實(shí)際開(kāi)發(fā)當(dāng)中的需求,此時(shí)我們就需要根據(jù)實(shí)際需求自定義某些 Sink。
????????官方也提供了自定義 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根據(jù)官方說(shuō)明自定義 MySink 需要繼承 AbstractSink 類并實(shí)現(xiàn) Configurable 接口。實(shí)現(xiàn)相應(yīng)方法:
  • configure(Context context)//初始化 context(讀取配置文件內(nèi)容)
  • process()//從 Channel 讀取獲取數(shù)據(jù)(event),這個(gè)方法將被循環(huán)調(diào)用。
使用場(chǎng)景:讀取 Channel 數(shù)據(jù)寫(xiě)入 MySQL 或者其他文件系統(tǒng)。

2)需求分析

使用 flume 接收數(shù)據(jù),并在 Sink 端給每條數(shù)據(jù)添加前綴和后綴,輸出到控制臺(tái)。前后綴可在 flume 任務(wù)配置文件中配置。
流程分析:

?3)需求實(shí)現(xiàn)

package com.lyh.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable{private final static Logger LOG = LoggerFactory.getLogger(AbstractSink.class);private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {// 聲明返回值狀態(tài)信息Status status;// 獲取當(dāng)前 sink 綁定的 channelChannel channel = getChannel();// 獲取事務(wù)Transaction txn = channel.getTransaction();// 聲明事件Event event;// 開(kāi)啟事務(wù)txn.begin();// 讀取 channel 中的事件、直到讀取事件結(jié)束循環(huán)while (true){event = channel.take();if (event!=null) break;}try {// 打印事件LOG.info(prefix + new String(event.getBody()) + suffix);// 事務(wù)提交txn.commit();status = Status.READY;}catch (Exception e){// 遇到異?;貪L事務(wù)txn.rollback();status = Status.BACKOFF;}finally {// 關(guān)閉事務(wù)txn.close();}return null;}// 初始化配置信息@Overridepublic void configure(Context context) {// 帶默認(rèn)值prefix = context.getString("prefix","hello");// 不帶默認(rèn)值suffix = context.getString("suffix");}
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
a1.sinks.k1.prefix = lyh:
a1.sinks.k1.suffix = :lyh# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4)測(cè)試

bin/flume-ng agent -n a1 -c conf/ -f job/custom-sink.conf -Dflume.root.logger=INFO,console

運(yùn)行結(jié)果:

總結(jié)

? ? ? ? 自此,flume 的學(xué)習(xí)基本也完了,這一篇雖然不多但也用了大概3天時(shí)間。相比較 kafka、flink,flume 這個(gè)框架還是非常簡(jiǎn)單的,比如我們自己實(shí)現(xiàn)一些 source、sink,都是很簡(jiǎn)單的,沒(méi)有太多復(fù)雜的理解的東西。

? ? ? ? 總之 flume 這個(gè)工具還是多看官網(wǎng)。

http://www.risenshineclean.com/news/34030.html

相關(guān)文章:

  • 成都網(wǎng)站建設(shè)網(wǎng)絡(luò)營(yíng)銷的特點(diǎn)和優(yōu)勢(shì)
  • 建設(shè)部投訴網(wǎng)站提高搜索引擎排名
  • jsp做的網(wǎng)站如何查看站長(zhǎng)資源平臺(tái)
  • 湖北宜昌網(wǎng)絡(luò)科技有限公司優(yōu)化課程設(shè)置
  • 現(xiàn)在最流行的網(wǎng)站推廣方式有哪些谷歌seo是什么
  • 怎樣設(shè)計(jì)自己網(wǎng)站域名不要手賤搜這15個(gè)關(guān)鍵詞
  • seo網(wǎng)站分析報(bào)告百度置頂廣告多少錢
  • 企業(yè)建設(shè)網(wǎng)站項(xiàng)目背景線下宣傳渠道和宣傳方式
  • 用服務(wù)器ip可以做網(wǎng)站嗎百度seo在線優(yōu)化
  • 專業(yè)的設(shè)計(jì)網(wǎng)站有哪些中國(guó)站免費(fèi)推廣入口
  • 做網(wǎng)站國(guó)家大學(xué)科技園鄭州網(wǎng)絡(luò)建設(shè)推廣
  • 哪些網(wǎng)站是java開(kāi)發(fā)的優(yōu)化關(guān)鍵詞的方法有哪些
  • 沈陽(yáng)市建設(shè)工程項(xiàng)目管理中心網(wǎng)站優(yōu)化大師官網(wǎng)
  • 做的比較好的手機(jī)網(wǎng)站東營(yíng)網(wǎng)站推廣公司
  • 做技術(shù)網(wǎng)站在背景圖產(chǎn)品推廣步驟
  • 有人說(shuō)做網(wǎng)站賭上海培訓(xùn)機(jī)構(gòu)整頓
  • 星沙網(wǎng)站制作網(wǎng)上宣傳廣告怎么做
  • 揭陽(yáng)手機(jī)網(wǎng)站建設(shè) 今日頭條
  • 做cpa搭建哪個(gè)網(wǎng)站比較好永久免費(fèi)制作網(wǎng)頁(yè)
  • 中國(guó)有色金屬建設(shè)股份有限公司網(wǎng)站seoheuni
  • 做網(wǎng)站的公司哪家最好廈門最快seo
  • 中拓網(wǎng)絡(luò)科技有限公司北京seo不到首頁(yè)不扣費(fèi)
  • 廣州最新發(fā)布最新百度seo新站優(yōu)化
  • 一臺(tái)云服務(wù)器做多個(gè)網(wǎng)站惠州seo外包服務(wù)
  • 成都網(wǎng)站建設(shè)名錄海南seo排名優(yōu)化公司
  • crm與scrmseo短視頻網(wǎng)頁(yè)入口引流網(wǎng)站
  • linux系統(tǒng)網(wǎng)站空間正規(guī)seo關(guān)鍵詞排名哪家專業(yè)
  • 展示網(wǎng)站動(dòng)畫(huà)怎么做的站長(zhǎng)工具seo綜合查詢官網(wǎng)
  • 怎么自己做網(wǎng)站的步驟百度推廣是做什么的
  • 網(wǎng)站刷新新前臺(tái)是什么意思2345網(wǎng)址中國(guó)最好