佛山網(wǎng)站設(shè)計(jì)多少錢而的跟地seo排名點(diǎn)擊軟件
文章作者郵箱:yugongshiye@sina.cn? ? ? ? ? ? ? 地址:廣東惠州
?▲ 本章節(jié)目的
??了解Interceptor的概念和配置參數(shù);
??掌握Interceptor的使用方法;
??掌握Interceptor的Host Interceptor;
??掌握Interceptor的Static Interceptor;
??掌握Interceptor的UUID Interceptor;
??掌握Interceptor的Search And Replace Interceptor;
??掌握Interceptor的Regex Filtering Interceptor;
??掌握Interceptor的Custom Interceptor;
一、Timestamp Interceptor
1.?概述
1. Timestamp Interceptor是在headers中來添加一個(gè)timestamp字段來標(biāo)記數(shù)據(jù)被收集的時(shí)間。
2. Timestamp Interceptor結(jié)合HDFS Sink可以實(shí)現(xiàn)數(shù)據(jù)按天存儲(chǔ)。
2.?配置屬性
屬性 | 解釋 |
type | timestamp |
3.?案例
1. 編寫格式文件,添加如下內(nèi)容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 給Interceptor起名
a1.sources.s1.interceptors = i1
# 指定Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2.?啟動(dòng)Flume:
../bin/flume-ng agent -n a1 -c ../conf -f in.conf -
Dflume.root.logger=INFO,console
4.?數(shù)據(jù)按天存放
1. 編寫格式文件,添加如下內(nèi)容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop01
a1.sources.s1.port = 8090
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = timestamp
a1.channels.c1.type = memory
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata/date=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2.?啟動(dòng)Flume:
../bin/flume-ng agent -n a1 -c ../conf -f hdfsin.conf -
Dflume.root.logger=INFO,console
二、Host Interceptor
1.?概述
1. Host Interceptor是在headers中添加一個(gè)字段host。
2. Host Interceptor可以用于標(biāo)記數(shù)據(jù)來源于哪一臺(tái)主機(jī)。
2.?配置屬性
屬性 | 解釋 |
type | 必須是host |
3.?案例
1. 編寫格式文件,添加如下內(nèi)容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 給Interceptor起名
a1.sources.s1.interceptors = i1 i2
# 指定Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
# 指定Host Interceptor
a1.sources.s1.interceptors.i2.type = host
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2.?啟動(dòng)Flume:
../bin/flume-ng agent -n a1 -c ../conf -f in.conf -
Dflume.root.logger=INFO,console
三、Static Interceptor
1.?概述
1. Static Interceptor是在headers中添加指定字段。
2. 可以利用這個(gè)Interceptor來標(biāo)記數(shù)據(jù)的類型。
2.?配置屬性
屬性 | 解釋 |
type | 必須是static |
key | 指定在headers中的字段名 |
value | 指定在headers中的字段值 |
3.?案例
1. 編寫格式文件,添加如下內(nèi)容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 給Interceptor起名
a1.sources.s1.interceptors = i1 i2 i3
# 指定Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
# 指定Host Interceptor
a1.sources.s1.interceptors.i2.type = host
# 指定Static Interceptor
a1.sources.s1.interceptors.i3.type = static
a1.sources.s1.interceptors.i3.key = kind
a1.sources.s1.interceptors.i3.value = log
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2.?啟動(dòng)Flume:
../bin/flume-ng agent -n a1 -c ../conf -f in.conf -
Dflume.root.logger=INFO,console
四、UUID Interceptor
1.?概述
1. UUID Interceptor是在headers中添加一個(gè)id字段。
2. 可以用于標(biāo)記數(shù)據(jù)的唯一性。
2.?配置屬性
屬性 | 解釋 |
type | 必須是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
3.?案例
1. 編寫格式文件,添加如下內(nèi)容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 給Interceptor起名
a1.sources.s1.interceptors = i1 i2 i3 i4
# 指定Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp
# 指定Host Interceptor
a1.sources.s1.interceptors.i2.type = host
# 指定Static Interceptor
a1.sources.s1.interceptors.i3.type = static
a1.sources.s1.interceptors.i3.key = kind
a1.sources.s1.interceptors.i3.value = log
# 指定UUID Interceptor
a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2.?啟動(dòng)Flume:
../bin/flume-ng agent -n a1 -c ../conf -f in.conf -
Dflume.root.logger=INFO,console
五、Search And Replace Interceptor
1.?概述
1. Search And Replace Interceptor在使用的時(shí)候,需要指定正則表達(dá)式,會(huì)根據(jù)正則表達(dá)式的規(guī)則,將符合正則表達(dá)式的數(shù)據(jù)替換為指定形式的數(shù)據(jù)。
2. 在替換的時(shí)候,不會(huì)替換headers中的數(shù)據(jù),而是會(huì)替換body中的數(shù)據(jù)。
2.?配置屬性
屬性 | 解釋 |
type | 必須是search_replace |
searchPattern | 指定要匹配的正則形式 |
replaceString | 指定要替換的字符串 |
3.?案例
1. 編寫格式文件,添加如下內(nèi)容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = http
a1.sources.s1.port = 8090
# 給攔截器起名
a1.sources.s1.interceptors = i1
# 指定類型
a1.sources.s1.interceptors.i1.type = search_replace
a1.sources.s1.interceptors.i1.searchPattern = [0-9]
a1.sources.s1.interceptors.i1.replaceString = *
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2.?啟動(dòng)Flume:
../bin/flume-ng agent -n a1 -c ../conf -f searchin.conf -
Dflume.root.logger=INFO,console
六、Regex Filtering Interceptor
1.?概述
1. Regex Filtering Interceptor在使用的時(shí)候需要指定正則表達(dá)式。
2. 屬性excludeEvents的值如果不指定,默認(rèn)是false。
3. 如果沒有配置excludeEvents的值或者配置excludeEvents的值配置為false,則只有符合正則表達(dá)式的數(shù)據(jù)會(huì)留下來,其他不符合正則表達(dá)式的數(shù)據(jù)會(huì)被過濾掉;如果excludeEvents的值,那么符合正則表達(dá)式的數(shù)據(jù)會(huì)被過濾掉,其他的數(shù)據(jù)則會(huì)被留下來。
2.?配置屬性
屬性 | 解釋 |
type | 必須是regex_filter |
regex | 指定正則表達(dá)式 |
excludeEvents | true或者false |
3.?案例
1. 編寫格式文件,添加如下內(nèi)容:
# 定義 數(shù)據(jù)源(輸入端) 緩沖區(qū) 輸出源(輸出端)
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 輸入端
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/upload
a1.sources.r1.fileSuffix = .done
# 攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
#全部都是符合條件的數(shù)據(jù)
a1.sources.r1.interceptors.i1.regex = ^.*INFO.*$
#排除符合正則表達(dá)式的數(shù)據(jù)
# a1.sources.r1.interceptors.i1.excludeEvents = true
# 輸出端
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://flume45:9000/interceptors/%Y%m%d/%H
#是否使用本地時(shí)間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 序列化
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 0
# 使用一個(gè)在內(nèi)存中緩沖事件的通道
a1.channels.c1.type = memory
# 連接通道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.?啟動(dòng)Flume:
../bin/flume-ng agent -n a1 -c ../conf -f regexin.conf -
Dflume.root.logger=INFO,console
七、Custom Interceptor
1.?概述
1. 在Flume中,也允許自定義攔截器。但是不同于其他組件,自定義Interceptor的時(shí)候,需要再額外覆蓋其中的內(nèi)部接口。
2. 步驟:
a. 構(gòu)建Maven工程,導(dǎo)入對應(yīng)的依賴。
b. 自定義一個(gè)類實(shí)現(xiàn)Interceptor接口,覆蓋其中initialize,intercept和close方法。
c. 定義靜態(tài)內(nèi)部類,實(shí)現(xiàn)Interceptor.Builder內(nèi)部接口。
d. 打成jar包方法Flume安裝目錄的lib目錄下。
e. 編寫格式文件,添加如下內(nèi)容:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
# 指定攔截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = cn.tedu.flume.interceptor.AuthInterceptor$Builder
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
f.?啟動(dòng)Flume:
../bin/flume-ng agent -n a1 -c ../conf -f authin.conf -
Dflume.root.logger=INFO,console