各種瀏覽器網(wǎng)站大全淘寶新店怎么快速做起來(lái)
?????????????????????? 星光下的趕路人star的個(gè)人主頁(yè)
??????????????????????我的敵手就是我自己,我要他美好到能使我滿意的程度
文章目錄
- 1、處理函數(shù)
- 1.1 基本處理函數(shù)(ProcessFunction)
- 1.1.1 處理函數(shù)的功能和使用
- 1.1.2 ProcessFunction解析
- 1.1.3 處理函數(shù)的分類
- 1.2 按鍵分區(qū)處理函數(shù)(KeyedProcessFunction)
- 1.2.1 定時(shí)器(Timer)和定時(shí)服務(wù)(TimeService)
- 1.2.2 KeyedProcessFunction案例
- 1.3 窗口處理函數(shù)
- 1.3.1 窗口處理函數(shù)的使用
- 1.3.2 ProcessWindowFunction解析
- 1.4 應(yīng)用案例---topN
- 1.4.1 使用ProcessAllWindowFunction
- 1.4.2 使用KeyedProcessFunction
- 1.5 側(cè)輸出流(Side Output)
1、處理函數(shù)
之前所介紹的流處理API,無(wú)論是基本的轉(zhuǎn)換、聚合,還是更為復(fù)雜的窗口操作,其實(shí)都是基于DataStream進(jìn)行轉(zhuǎn)換的,所以可以統(tǒng)稱為DataStream API。
在Flink更底層,我們可以不定義任何具體的算子(比如map,filter,或者window),而只是提煉出一個(gè)統(tǒng)一的“處理”(process)操作——它是所有轉(zhuǎn)換算子的一個(gè)概括性的表達(dá),可以自定義處理邏輯,所以這一層接口就被叫作“處理函數(shù)”(process function)。
1.1 基本處理函數(shù)(ProcessFunction)
1.1.1 處理函數(shù)的功能和使用
我們之前學(xué)習(xí)的轉(zhuǎn)換算子,一般只是針對(duì)某種具體操作來(lái)定義的,能夠拿到的信息比較有限。如果我們想要訪問(wèn)事件的時(shí)間戳,或者當(dāng)前的水位線信息,都是完全做不到的。跟時(shí)間相關(guān)的操作,目前我們只會(huì)用窗口來(lái)處理。而在很多應(yīng)用需求中,要求我們對(duì)時(shí)間有更精細(xì)的控制,需要能夠獲取水位線,甚至要“把控時(shí)間”、定義什么時(shí)候做什么事,這就不是基本的時(shí)間窗口能夠?qū)崿F(xiàn)的了。
這時(shí)就需要使用底層的處理函數(shù)。處理函數(shù)提供了一個(gè)“定時(shí)服務(wù)”(TimerService),我們可以通過(guò)它訪問(wèn)流中的事件(event)、時(shí)間戳(timestamp)、水位線(watermark),甚至可以注冊(cè)“定時(shí)事件”。而且處理函數(shù)繼承了AbstractRichFunction抽象類,所以擁有富函數(shù)類的所有特性,同樣可以訪問(wèn)狀態(tài)(state)和其他運(yùn)行時(shí)信息。此外,處理函數(shù)還可以直接將數(shù)據(jù)輸出到側(cè)輸出流(side output)中。所以,處理函數(shù)是最為靈活的處理方法,可以實(shí)現(xiàn)各種自定義的業(yè)務(wù)邏輯。
處理函數(shù)的使用與基本的轉(zhuǎn)換操作類似,只需要直接基于DataStream調(diào)用.process()方法就可以了。方法需要傳入一個(gè)ProcessFunction作為參數(shù),用來(lái)定義處理邏輯。
stream.process(new MyProcessFunction()
這里ProcessFunction不是接口,而是一個(gè)抽象類,繼承了AbstractRichFunction;MyProcessFunction是它的一個(gè)具體實(shí)現(xiàn)。所以所有的處理函數(shù),都是富函數(shù)(RichFunction),富函數(shù)可以調(diào)用的東西這里同樣都可以調(diào)用。
1.1.2 ProcessFunction解析
在源碼中我們可以看到,抽象類ProcessFunction繼承了AbstractRichFunction,有兩個(gè)泛型類型參數(shù):I表示Input,也就是輸入的數(shù)據(jù)類型;O表示Output,也就是處理完成之后輸出的數(shù)據(jù)類型。
內(nèi)部單獨(dú)定義了兩個(gè)方法:一個(gè)是必須要實(shí)現(xiàn)的抽象方法.processElement();另一個(gè)是非抽象方法.onTimer()。
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...}
1、抽象方法processElement()
用于“處理元素”,定義了處理的核心邏輯。這個(gè)方法對(duì)于流中的每個(gè)元素都會(huì)調(diào)用一次,參數(shù)包括三個(gè):輸入數(shù)據(jù)值value,上下文ctx,以及“收集器”(Collector)out。方法沒(méi)有返回值,處理之后的輸出數(shù)據(jù)是通過(guò)收集器out來(lái)定義的。
-value:當(dāng)前流中的輸入元素,也就是正在處理的數(shù)據(jù),類型和流中數(shù)據(jù)類型一致。
- ctx:類型是ProcessFunction中定義的內(nèi)部抽象類Context,表示當(dāng)前運(yùn)行的上下文,可以獲取到當(dāng)前的時(shí)間戳,并提供了用于查詢時(shí)間和注冊(cè)定時(shí)器的“定時(shí)服務(wù)”(TimerService),以及可以將輸出發(fā)送到“側(cè)輸出流”(side output)的方法output()。
- out :“收集器”(類型為Collector),用于返回輸出數(shù)據(jù)。使用方式與flatMap算子中的收集器完全一樣,直接調(diào)用out.collect()方法就可以向下游發(fā)出一個(gè)數(shù)據(jù)。這個(gè)方法可以多次調(diào)用,也可以不調(diào)用。
通過(guò)幾個(gè)參數(shù)的分析不難發(fā)現(xiàn),ProcessFunction可以輕松實(shí)現(xiàn)flatMap、map、filter這樣的基本轉(zhuǎn)換功能;而通過(guò)富函數(shù)提供的獲取上下文方法.getRuntimeContext(),也可以自定義狀態(tài)(state)進(jìn)行處理,這也就能實(shí)現(xiàn)聚合操作的功能了。
2、非抽象方法onTimer()
這個(gè)方法只有在注冊(cè)好的定時(shí)器觸發(fā)的時(shí)候才會(huì)調(diào)用,而定時(shí)器是通過(guò)“定時(shí)服務(wù)”TimerService來(lái)注冊(cè)的。打個(gè)比方,注冊(cè)定時(shí)器(timer)就是設(shè)了一個(gè)鬧鐘,到了設(shè)定時(shí)間就會(huì)響;而.onTimer()中定義的,就是鬧鐘響的時(shí)候要做的事。所以它本質(zhì)上是一個(gè)基于時(shí)間的“回調(diào)”(callback)方法,通過(guò)時(shí)間的進(jìn)展來(lái)觸發(fā);在事件時(shí)間語(yǔ)義下就是由水位線(watermark)來(lái)觸發(fā)了。
定時(shí)方法.onTimer()也有三個(gè)參數(shù):時(shí)間戳(timestamp),上下文(ctx),以及收集器(out)。這里的timestamp是指設(shè)定好的觸發(fā)時(shí)間,事件時(shí)間語(yǔ)義下當(dāng)然就是水位線了。另外這里同樣有上下文和收集器,所以也可以調(diào)用定時(shí)服務(wù)(TimerService),以及任意輸出處理之后的數(shù)據(jù)。
既然有.onTimer()方法做定時(shí)觸發(fā),我們用ProcessFunction也可以自定義數(shù)據(jù)按照時(shí)間分組、定時(shí)觸發(fā)計(jì)算輸出結(jié)果;這其實(shí)就實(shí)現(xiàn)了窗口(window)的功能。所以說(shuō)ProcessFunction其實(shí)可以實(shí)現(xiàn)一切功能。
注意:在Flink中,只有“按鍵分區(qū)流”KeyedStream才支持設(shè)置定時(shí)器的操作。
1.1.3 處理函數(shù)的分類
我們知道,DataStream在調(diào)用一些轉(zhuǎn)換方法之后,有可能生成新的流類型;例如調(diào)用.keyBy()之后得到KeyedStream,進(jìn)而再調(diào)用.window()之后得到WindowedStream。對(duì)于不同類型的流,其實(shí)都可以直接調(diào)用.process()方法進(jìn)行自定義處理,這時(shí)傳入的參數(shù)就都叫作處理函數(shù)。當(dāng)然,它們盡管本質(zhì)相同,都是可以訪問(wèn)狀態(tài)和時(shí)間信息的底層API,可彼此之間也會(huì)有所差異。
Flink提供了8個(gè)不同的處理函數(shù):
(1)ProcessFunction
最基本的處理函數(shù),基于DataStream直接調(diào)用.process()時(shí)作為參數(shù)傳入。
(2)KeyedProcessFunction
對(duì)流按鍵分區(qū)后的處理函數(shù),基于KeyedStream調(diào)用.process()時(shí)作為參數(shù)傳入。要想使用定時(shí)器,比如基于KeyedStream。
(3)ProcessWindowFunction
開(kāi)窗之后的處理函數(shù),也是全窗口函數(shù)的代表?;赪indowedStream調(diào)用.process()時(shí)作為參數(shù)傳入。
(4)ProcessAllWindowFunction
同樣是開(kāi)窗之后的處理函數(shù),基于AllWindowedStream調(diào)用.process()時(shí)作為參數(shù)傳入。
(5)CoProcessFunction
合并(connect)兩條流之后的處理函數(shù),基于ConnectedStreams調(diào)用.process()時(shí)作為參數(shù)傳入。關(guān)于流的連接合并操作,我們會(huì)在后續(xù)章節(jié)詳細(xì)介紹。
(6)ProcessJoinFunction
間隔連接(interval join)兩條流之后的處理函數(shù),基于IntervalJoined調(diào)用.process()時(shí)作為參數(shù)傳入。
(7)BroadcastProcessFunction
廣播連接流處理函數(shù),基于BroadcastConnectedStream調(diào)用.process()時(shí)作為參數(shù)傳入。這里的“廣播連接流”BroadcastConnectedStream,是一個(gè)未keyBy的普通DataStream與一個(gè)廣播流(BroadcastStream)做連接(conncet)之后的產(chǎn)物。關(guān)于廣播流的相關(guān)操作,我們會(huì)在后續(xù)章節(jié)詳細(xì)介紹。
(8)KeyedBroadcastProcessFunction
按鍵分區(qū)的廣播連接流處理函數(shù),同樣是基于BroadcastConnectedStream調(diào)用.process()時(shí)作為參數(shù)傳入。與BroadcastProcessFunction不同的是,這時(shí)的廣播連接流,是一個(gè)KeyedStream與廣播流(BroadcastStream)做連接之后的產(chǎn)物。
1.2 按鍵分區(qū)處理函數(shù)(KeyedProcessFunction)
在上節(jié)中提到,只有在KeyedStream中才支持使用TimerService設(shè)置定時(shí)器的操作。所以一般情況下,我們都是先做了keyBy分區(qū)之后,再去定義處理操作;代碼中更加常見(jiàn)的處理函數(shù)是KeyedProcessFunction。
1.2.1 定時(shí)器(Timer)和定時(shí)服務(wù)(TimeService)
在.onTimer()方法中可以實(shí)現(xiàn)定時(shí)處理的邏輯,而它能觸發(fā)的前提,就是之前曾經(jīng)注冊(cè)過(guò)定時(shí)器、并且現(xiàn)在已經(jīng)到了觸發(fā)時(shí)間。注冊(cè)定時(shí)器的功能,是通過(guò)上下文中提供的“定時(shí)服務(wù)”來(lái)實(shí)現(xiàn)的。
定時(shí)服務(wù)與當(dāng)前運(yùn)行的環(huán)境有關(guān)。前面已經(jīng)介紹過(guò),ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一個(gè)TimerService對(duì)象。TimerService是Flink關(guān)于時(shí)間和定時(shí)器的基礎(chǔ)服務(wù)接口,包含以下六個(gè)方法:
// 獲取當(dāng)前的處理時(shí)間
long currentProcessingTime();// 獲取當(dāng)前的水位線(事件時(shí)間)
long currentWatermark();// 注冊(cè)處理時(shí)間定時(shí)器,當(dāng)處理時(shí)間超過(guò)time時(shí)觸發(fā)
void registerProcessingTimeTimer(long time);// 注冊(cè)事件時(shí)間定時(shí)器,當(dāng)水位線超過(guò)time時(shí)觸發(fā)
void registerEventTimeTimer(long time);// 刪除觸發(fā)時(shí)間為time的處理時(shí)間定時(shí)器
void deleteProcessingTimeTimer(long time);// 刪除觸發(fā)時(shí)間為time的處理時(shí)間定時(shí)器
void deleteEventTimeTimer(long time);
六個(gè)方法可以分成兩大類:基于處理時(shí)間和基于事件時(shí)間。而對(duì)應(yīng)的操作主要有三個(gè):獲取當(dāng)前時(shí)間,注冊(cè)定時(shí)器,以及刪除定時(shí)器。需要注意,盡管處理函數(shù)中都可以直接訪問(wèn)TimerService,不過(guò)只有基于KeyedStream的處理函數(shù),才能去調(diào)用注冊(cè)和刪除定時(shí)器的方法;未作按鍵分區(qū)的DataStream不支持定時(shí)器操作,只能獲取當(dāng)前時(shí)間。
TimerService會(huì)以鍵(key)和時(shí)間戳為標(biāo)準(zhǔn),對(duì)定時(shí)器進(jìn)行去重;也就是說(shuō)對(duì)于每個(gè)key和時(shí)間戳,最多只有一個(gè)定時(shí)器,如果注冊(cè)了多次,onTimer()方法也將只被調(diào)用一次。
1.2.2 KeyedProcessFunction案例
基于keyBy之后的KeyedStream,直接調(diào)用.process()方法,這時(shí)需要傳入的參數(shù)就是KeyedProcessFunction的實(shí)現(xiàn)類。
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 來(lái)一條數(shù)據(jù)調(diào)用一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//獲取當(dāng)前數(shù)據(jù)的keyString currentKey = ctx.getCurrentKey();// TODO 1.定時(shí)器注冊(cè)TimerService timerService = ctx.timerService();// 1、事件時(shí)間的案例Long currentEventTime = ctx.timestamp(); // 數(shù)據(jù)中提取出來(lái)的事件時(shí)間timerService.registerEventTimeTimer(5000L);System.out.println("當(dāng)前key=" + currentKey + ",當(dāng)前時(shí)間=" + currentEventTime + ",注冊(cè)了一個(gè)5s的定時(shí)器");// 2、處理時(shí)間的案例
// long currentTs = timerService.currentProcessingTime();
// timerService.registerProcessingTimeTimer(currentTs + 5000L);
// System.out.println("當(dāng)前key=" + currentKey + ",當(dāng)前時(shí)間=" + currentTs + ",注冊(cè)了一個(gè)5s后的定時(shí)器");// 3、獲取 process的 當(dāng)前watermark
// long currentWatermark = timerService.currentWatermark();
// System.out.println("當(dāng)前數(shù)據(jù)=" + value + ",當(dāng)前watermark=" + currentWatermark);// 注冊(cè)定時(shí)器: 處理時(shí)間、事件時(shí)間
// timerService.registerProcessingTimeTimer();
// timerService.registerEventTimeTimer();// 刪除定時(shí)器: 處理時(shí)間、事件時(shí)間
// timerService.deleteEventTimeTimer();
// timerService.deleteProcessingTimeTimer();// 獲取當(dāng)前時(shí)間進(jìn)展: 處理時(shí)間-當(dāng)前系統(tǒng)時(shí)間, 事件時(shí)間-當(dāng)前watermark
// long currentTs = timerService.currentProcessingTime();
// long wm = timerService.currentWatermark();}/*** TODO 2.時(shí)間進(jìn)展到定時(shí)器注冊(cè)的時(shí)間,調(diào)用該方法* @param timestamp 當(dāng)前時(shí)間進(jìn)展,就是定時(shí)器被觸發(fā)時(shí)的時(shí)間* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "現(xiàn)在時(shí)間是" + timestamp + "定時(shí)器觸發(fā)");}});process.print();env.execute();}
}
1.3 窗口處理函數(shù)
除了KeyedProcessFunction,另外一大類常用的處理函數(shù),就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。我們之前已經(jīng)簡(jiǎn)單地使用過(guò)窗口處理函數(shù)了。
1.3.1 窗口處理函數(shù)的使用
進(jìn)行窗口計(jì)算,我們可以直接調(diào)用現(xiàn)成的簡(jiǎn)單聚合方法(sum/max/min),也可以通過(guò)調(diào)用.reduce()或.aggregate()來(lái)自定義一般的增量聚合函數(shù)(ReduceFunction/AggregateFucntion);而對(duì)于更加復(fù)雜、需要窗口信息和額外狀態(tài)的一些場(chǎng)景,我們還可以直接使用全窗口函數(shù)、把數(shù)據(jù)全部收集保存在窗口內(nèi),等到觸發(fā)窗口計(jì)算時(shí)再統(tǒng)一處理。窗口處理函數(shù)就是一種典型的全窗口函數(shù)。
窗口處理函數(shù)ProcessWindowFunction的使用與其他窗口函數(shù)類似,也是基于WindowedStream直接調(diào)用方法就可以,只不過(guò)這時(shí)調(diào)用的是.process()。
stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())
1.3.2 ProcessWindowFunction解析
ProcessWindowFunction既是處理函數(shù)又是全窗口函數(shù)。從名字上也可以推測(cè)出,它的本質(zhì)似乎更傾向于“窗口函數(shù)”一些。事實(shí)上它的用法也確實(shí)跟其他處理函數(shù)有很大不同。我們可以從源碼中的定義看到這一點(diǎn):
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {...public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {...}
}
ProcessWindowFunction依然是一個(gè)繼承了AbstractRichFunction的抽象類,它有四個(gè)類型參數(shù):
- IN:input,數(shù)據(jù)流中窗口任務(wù)的輸入數(shù)據(jù)類型。
- OUT:output,窗口任務(wù)進(jìn)行計(jì)算之后的輸出數(shù)據(jù)類型。
- KEY:數(shù)據(jù)中鍵key的類型。
- W:窗口的類型,是Window的子類型。一般情況下我們定義時(shí)間窗口,W就是TimeWindow。
ProcessWindowFunction里面處理數(shù)據(jù)的核心方法.process()。方法包含四個(gè)參數(shù)。 - key:窗口做統(tǒng)計(jì)計(jì)算基于的鍵,也就是之前keyBy用來(lái)分區(qū)的字段。
- context:當(dāng)前窗口進(jìn)行計(jì)算的上下文,它的類型就是ProcessWindowFunction內(nèi)部定義的抽象類Context。
- elements:窗口收集到用來(lái)計(jì)算的所有數(shù)據(jù),這是一個(gè)可迭代的集合類型。
-out:用來(lái)發(fā)送數(shù)據(jù)輸出計(jì)算結(jié)果的收集器,類型為Collector。
可以明顯看出,這里的參數(shù)不再是一個(gè)輸入數(shù)據(jù),而是窗口中所有數(shù)據(jù)的集合。而上下文context所包含的內(nèi)容也跟其他處理函數(shù)有所差別:
public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract <X> void output(OutputTag<X> outputTag, X value);}
除了可以通過(guò).output()方法定義側(cè)輸出流不變外,其他部分都有所變化。這里不再持有TimerService對(duì)象,只能通過(guò)currentProcessingTime()和currentWatermark()來(lái)獲取當(dāng)前時(shí)間,所以失去了設(shè)置定時(shí)器的功能;另外由于當(dāng)前不是只處理一個(gè)數(shù)據(jù),所以也不再提供.timestamp()方法。與此同時(shí),也增加了一些獲取其他信息的方法:比如可以通過(guò).window()直接獲取到當(dāng)前的窗口對(duì)象,也可以通過(guò).windowState()和.globalState()獲取到當(dāng)前自定義的窗口狀態(tài)和全局狀態(tài)。注意這里的“窗口狀態(tài)”是自定義的,不包括窗口本身已經(jīng)有的狀態(tài),針對(duì)當(dāng)前key、當(dāng)前窗口有效;而“全局狀態(tài)”同樣是自定義的狀態(tài),針對(duì)當(dāng)前key的所有窗口有效。
所以我們會(huì)發(fā)現(xiàn),ProcessWindowFunction中除了.process()方法外,并沒(méi)有.onTimer()方法,而是多出了一個(gè).clear()方法。從名字就可以看出,這主要是方便我們進(jìn)行窗口的清理工作。如果我們自定義了窗口狀態(tài),那么必須在.clear()方法中進(jìn)行顯式地清除,避免內(nèi)存溢出。
至于另一種窗口處理函數(shù)ProcessAllWindowFunction,它的用法非常類似。區(qū)別在于它基于的是AllWindowedStream,相當(dāng)于對(duì)沒(méi)有keyBy的數(shù)據(jù)流直接開(kāi)窗并調(diào)用.process()方法:
stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction())
1.4 應(yīng)用案例—topN
案例需求:實(shí)時(shí)統(tǒng)計(jì)一段時(shí)間內(nèi)的出現(xiàn)次數(shù)最多的水位。例如,統(tǒng)計(jì)最近10秒鐘內(nèi)出現(xiàn)次數(shù)最多的兩個(gè)水位,并且每5秒鐘更新一次。我們知道,這可以用一個(gè)滑動(dòng)窗口來(lái)實(shí)現(xiàn)。于是就需要開(kāi)滑動(dòng)窗口收集傳感器的數(shù)據(jù),按照不同的水位進(jìn)行統(tǒng)計(jì),而后匯總排序并最終輸出前兩名。這其實(shí)就是著名的“Top N”問(wèn)題。
1.4.1 使用ProcessAllWindowFunction
思路一:一種最簡(jiǎn)單的想法是,我們干脆不區(qū)分不同水位,而是將所有訪問(wèn)數(shù)據(jù)都收集起來(lái),統(tǒng)一進(jìn)行統(tǒng)計(jì)計(jì)算。所以可以不做keyBy,直接基于DataStream開(kāi)窗,然后使用全窗口函數(shù)ProcessAllWindowFunction來(lái)進(jìn)行處理。
在窗口中可以用一個(gè)HashMap來(lái)保存每個(gè)水位的出現(xiàn)次數(shù),只要遍歷窗口中的所有數(shù)據(jù),自然就能得到所有水位的出現(xiàn)次數(shù)。最后把HashMap轉(zhuǎn)成一個(gè)列表ArrayList,然后進(jìn)行排序、取出前兩名輸出就可以了。
代碼具體實(shí)現(xiàn)如下:
public class ProcessAllWindowTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));// 最近10秒= 窗口長(zhǎng)度, 每5秒輸出 = 滑動(dòng)步長(zhǎng)// TODO 思路一: 所有數(shù)據(jù)到一起, 用hashmap存, key=vc,value=count值sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new MyTopNPAWF()).print();env.execute();}public static class MyTopNPAWF extends ProcessAllWindowFunction<WaterSensor, String, TimeWindow> {@Overridepublic void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {// 定義一個(gè)hashmap用來(lái)存,key=vc,value=count值Map<Integer, Integer> vcCountMap = new HashMap<>();// 1.遍歷數(shù)據(jù), 統(tǒng)計(jì) 各個(gè)vc出現(xiàn)的次數(shù)for (WaterSensor element : elements) {Integer vc = element.getVc();if (vcCountMap.containsKey(vc)) {// 1.1 key存在,不是這個(gè)key的第一條數(shù)據(jù),直接累加vcCountMap.put(vc, vcCountMap.get(vc) + 1);} else {// 1.2 key不存在,初始化vcCountMap.put(vc, 1);}}// 2.對(duì) count值進(jìn)行排序: 利用List來(lái)實(shí)現(xiàn)排序List<Tuple2<Integer, Integer>> datas = new ArrayList<>();for (Integer vc : vcCountMap.keySet()) {datas.add(Tuple2.of(vc, vcCountMap.get(vc)));}// 對(duì)List進(jìn)行排序,根據(jù)count值 降序datas.sort(new Comparator<Tuple2<Integer, Integer>>() {@Overridepublic int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {// 降序, 后 減 前return o2.f1 - o1.f1;}});// 3.取出 count最大的2個(gè) vcStringBuilder outStr = new StringBuilder();outStr.append("================================\n");// 遍歷 排序后的 List,取出前2個(gè), 考慮可能List不夠2個(gè)的情況 ==》 List中元素的個(gè)數(shù) 和 2 取最小值for (int i = 0; i < Math.min(2, datas.size()); i++) {Tuple2<Integer, Integer> vcCount = datas.get(i);outStr.append("Top" + (i + 1) + "\n");outStr.append("vc=" + vcCount.f0 + "\n");outStr.append("count=" + vcCount.f1 + "\n");outStr.append("窗口結(jié)束時(shí)間=" + DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS") + "\n");outStr.append("================================\n");}out.collect(outStr.toString());}}
}
1.4.2 使用KeyedProcessFunction
思路二:在上一小節(jié)的實(shí)現(xiàn)過(guò)程中,我們沒(méi)有進(jìn)行按鍵分區(qū),直接將所有數(shù)據(jù)放在一個(gè)分區(qū)上進(jìn)行了開(kāi)窗操作。這相當(dāng)于將并行度強(qiáng)行設(shè)置為1,在實(shí)際應(yīng)用中是要盡量避免的,所以Flink官方也并不推薦使用AllWindowedStream進(jìn)行處理。另外,我們?cè)谌翱诤瘮?shù)中定義了HashMap來(lái)統(tǒng)計(jì)vc的出現(xiàn)次數(shù),計(jì)算過(guò)程是要先收集齊所有數(shù)據(jù)、然后再逐一遍歷更新HashMap,這顯然不夠高效。
基于這樣的想法,我們可以從兩個(gè)方面去做優(yōu)化:一是對(duì)數(shù)據(jù)進(jìn)行按鍵分區(qū),分別統(tǒng)計(jì)vc的出現(xiàn)次數(shù);二是進(jìn)行增量聚合,得到結(jié)果最后再做排序輸出。所以,我們可以使用增量聚合函數(shù)AggregateFunction進(jìn)行瀏覽量的統(tǒng)計(jì),然后結(jié)合ProcessWindowFunction排序輸出來(lái)實(shí)現(xiàn)Top N的需求。
具體實(shí)現(xiàn)可以分成兩步:先對(duì)每個(gè)vc統(tǒng)計(jì)出現(xiàn)次數(shù),然后再將統(tǒng)計(jì)結(jié)果收集起來(lái),排序輸出最終結(jié)果。由于最后的排序還是基于每個(gè)時(shí)間窗口的,輸出的統(tǒng)計(jì)結(jié)果中要包含窗口信息,我們可以輸出包含了vc、出現(xiàn)次數(shù)(count)以及窗口結(jié)束時(shí)間的Tuple3。之后先按窗口結(jié)束時(shí)間分區(qū),然后用KeyedProcessFunction來(lái)實(shí)現(xiàn)。
用KeyedProcessFunction來(lái)收集數(shù)據(jù)做排序,這時(shí)面對(duì)的是窗口聚合之后的數(shù)據(jù)流,而窗口已經(jīng)不存在了;我們需要確保能夠收集齊所有數(shù)據(jù),所以應(yīng)該在窗口結(jié)束時(shí)間基礎(chǔ)上再“多等一會(huì)兒”。具體實(shí)現(xiàn)上,可以采用一個(gè)延遲觸發(fā)的事件時(shí)間定時(shí)器。基于窗口的結(jié)束時(shí)間來(lái)設(shè)定延遲,其實(shí)并不需要等太久——因?yàn)槲覀兪强克痪€的推進(jìn)來(lái)觸發(fā)定時(shí)器,而水位線的含義就是“之前的數(shù)據(jù)都到齊了”。所以我們只需要設(shè)置1毫秒的延遲,就一定可以保證這一點(diǎn)。
而在等待過(guò)程中,之前已經(jīng)到達(dá)的數(shù)據(jù)應(yīng)該緩存起來(lái),我們這里用一個(gè)自定義的HashMap來(lái)進(jìn)行存儲(chǔ),key為窗口的標(biāo)記,value為L(zhǎng)ist。之后每來(lái)一條數(shù)據(jù),就把它添加到當(dāng)前的HashMap中,并注冊(cè)一個(gè)觸發(fā)時(shí)間為窗口結(jié)束時(shí)間加1毫秒(windowEnd + 1)的定時(shí)器。待到水位線到達(dá)這個(gè)時(shí)間,定時(shí)器觸發(fā),我們可以保證當(dāng)前窗口所有vc的統(tǒng)計(jì)結(jié)果Tuple3都到齊了;于是從HashMap中取出進(jìn)行排序輸出。
具體代碼實(shí)現(xiàn)如下:
public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));// 最近10秒= 窗口長(zhǎng)度, 每5秒輸出 = 滑動(dòng)步長(zhǎng)/*** TODO 思路二: 使用 KeyedProcessFunction實(shí)現(xiàn)* 1、按照vc做keyby,開(kāi)窗,分別count* ==》 增量聚合,計(jì)算 count* ==》 全窗口,對(duì)計(jì)算結(jié)果 count值封裝 , 帶上 窗口結(jié)束時(shí)間的 標(biāo)簽* ==》 為了讓同一個(gè)窗口時(shí)間范圍的計(jì)算結(jié)果到一起去** 2、對(duì)同一個(gè)窗口范圍的count值進(jìn)行處理: 排序、取前N個(gè)* =》 按照 windowEnd做keyby* =》 使用process, 來(lái)一條調(diào)用一次,需要先存,分開(kāi)存,用HashMap,key=windowEnd,value=List* =》 使用定時(shí)器,對(duì) 存起來(lái)的結(jié)果 進(jìn)行 排序、取前N個(gè)*/// 1. 按照 vc 分組、開(kāi)窗、聚合(增量計(jì)算+全量打標(biāo)簽)// 開(kāi)窗聚合后,就是普通的流,沒(méi)有了窗口信息,需要自己打上窗口的標(biāo)記 windowEndSingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS.keyBy(sensor -> sensor.getVc()).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(),new WindowResult());// 2. 按照窗口標(biāo)簽(窗口結(jié)束時(shí)間)keyby,保證同一個(gè)窗口時(shí)間范圍的結(jié)果,到一起去。排序、取TopNwindowAgg.keyBy(r -> r.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunction<WaterSensor, Integer, Integer> {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator + 1;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下:* 第一個(gè):輸入類型 = 增量函數(shù)的輸出 count值,Integer* 第二個(gè):輸出類型 = Tuple3(vc,count,windowEnd) ,帶上 窗口結(jié)束時(shí)間 的標(biāo)簽* 第三個(gè):key類型 , vc,Integer* 第四個(gè):窗口類型*/public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {// 迭代器里面只有一條數(shù)據(jù),next一次即可Integer count = elements.iterator().next();long windowEnd = context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}}public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {// 存不同窗口的 統(tǒng)計(jì)結(jié)果,key=windowEnd,value=list數(shù)據(jù)private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;// 要取的Top數(shù)量private int threshold;public TopN(int threshold) {this.threshold = threshold;dataListMap = new HashMap<>();}@Overridepublic void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {// 進(jìn)入這個(gè)方法,只是一條數(shù)據(jù),要排序,得到齊才行 ===》 存起來(lái),不同窗口分開(kāi)存// 1. 存到HashMap中Long windowEnd = value.f2;if (dataListMap.containsKey(windowEnd)) {// 1.1 包含vc,不是該vc的第一條,直接添加到List中List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.add(value);} else {// 1.1 不包含vc,是該vc的第一條,需要初始化listList<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();dataList.add(value);dataListMap.put(windowEnd, dataList);}// 2. 注冊(cè)一個(gè)定時(shí)器, windowEnd+1ms即可(// 同一個(gè)窗口范圍,應(yīng)該同時(shí)輸出,只不過(guò)是一條一條調(diào)用processElement方法,只需要延遲1ms即可ctx.timerService().registerEventTimeTimer(windowEnd + 1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);// 定時(shí)器觸發(fā),同一個(gè)窗口范圍的計(jì)算結(jié)果攢齊了,開(kāi)始 排序、取TopNLong windowEnd = ctx.getCurrentKey();// 1. 排序List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() {@Overridepublic int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {// 降序, 后 減 前return o2.f1 - o1.f1;}});// 2. 取TopNStringBuilder outStr = new StringBuilder();outStr.append("================================\n");// 遍歷 排序后的 List,取出前 threshold 個(gè), 考慮可能List不夠2個(gè)的情況 ==》 List中元素的個(gè)數(shù) 和 2 取最小值for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);outStr.append("Top" + (i + 1) + "\n");outStr.append("vc=" + vcCount.f0 + "\n");outStr.append("count=" + vcCount.f1 + "\n");outStr.append("窗口結(jié)束時(shí)間=" + vcCount.f2 + "\n");outStr.append("================================\n");}// 用完的List,及時(shí)清理,節(jié)省資源dataList.clear();out.collect(outStr.toString());}}
}
1.5 側(cè)輸出流(Side Output)
處理函數(shù)還有另外一個(gè)特有功能,就是將自定義的數(shù)據(jù)放入“側(cè)輸出流”(side output)輸出。這個(gè)概念我們并不陌生,之前在講到窗口處理遲到數(shù)據(jù)時(shí),最后一招就是輸出到側(cè)輸出流。而這種處理方式的本質(zhì),其實(shí)就是處理函數(shù)的側(cè)輸出流功能。
我們之前講到的絕大多數(shù)轉(zhuǎn)換算子,輸出的都是單一流,流里的數(shù)據(jù)類型只能有一種。而側(cè)輸出流可以認(rèn)為是“主流”上分叉出的“支流”,所以可以由一條流產(chǎn)生出多條流,而且這些流中的數(shù)據(jù)類型還可以不一樣。利用這個(gè)功能可以很容易地實(shí)現(xiàn)“分流”操作。
具體應(yīng)用時(shí),只要在處理函數(shù)的.processElement()或者.onTimer()方法中,調(diào)用上下文的.output()方法就可以了。
DataStream<Integer> stream = env.fromSource(...);OutputTag<String> outputTag = new OutputTag<String>("side-output") {};SingleOutputStreamOperator<Long> longStream = stream.process(new ProcessFunction<Integer, Long>() {@Overridepublic void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception {// 轉(zhuǎn)換成Long,輸出到主流中out.collect(Long.valueOf(value));// 轉(zhuǎn)換成String,輸出到側(cè)輸出流中ctx.output(outputTag, "side-output: " + String.valueOf(value));}
});
這里output()方法需要傳入兩個(gè)參數(shù),第一個(gè)是一個(gè)“輸出標(biāo)簽”O(jiān)utputTag,用來(lái)標(biāo)識(shí)側(cè)輸出流,一般會(huì)在外部統(tǒng)一聲明;第二個(gè)就是要輸出的數(shù)據(jù)。
我們可以在外部先將OutputTag聲明出來(lái):
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
如果想要獲取這個(gè)側(cè)輸出流,可以基于處理之后的DataStream直接調(diào)用.getSideOutput()方法,傳入對(duì)應(yīng)的OutputTag,這個(gè)方式與窗口API中獲取側(cè)輸出流是完全一樣的。
DataStream<String> stringStream = longStream.getSideOutput(outputTag);
案例需求:對(duì)每個(gè)傳感器,水位超過(guò)10的輸出告警信息
public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);SingleOutputStreamOperator<WaterSensor> process = sensorDS.keyBy(sensor -> sensor.getId()).process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {// 使用側(cè)輸出流告警if (value.getVc() > 10) {ctx.output(warnTag, "當(dāng)前水位=" + value.getVc() + ",大于閾值10!!!");}// 主流正常 發(fā)送數(shù)據(jù)out.collect(value);}});process.print("主流");process.getSideOutput(warnTag).printToErr("warn");env.execute();}
}
??????????????????????您的支持是我創(chuàng)作的無(wú)限動(dòng)力
??????????????????????希望我能為您的未來(lái)盡綿薄之力
??????????????????????如有錯(cuò)誤,謝謝指正;若有收獲,謝謝贊美